hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1150910 - in /hadoop/common/trunk/common: CHANGES.txt src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java
Date Mon, 25 Jul 2011 21:10:25 GMT
Author: todd
Date: Mon Jul 25 21:10:24 2011
New Revision: 1150910

URL: http://svn.apache.org/viewvc?rev=1150910&view=rev
Log:
HADOOP-7298. Add test utility for writing multi-threaded tests. Contributed by Todd Lipcon
and Harsh J Chouraria.

Added:
    hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java
    hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java
Modified:
    hadoop/common/trunk/common/CHANGES.txt

Modified: hadoop/common/trunk/common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/CHANGES.txt?rev=1150910&r1=1150909&r2=1150910&view=diff
==============================================================================
--- hadoop/common/trunk/common/CHANGES.txt (original)
+++ hadoop/common/trunk/common/CHANGES.txt Mon Jul 25 21:10:24 2011
@@ -280,6 +280,9 @@ Trunk (unreleased changes)
     HADOOP-7463. Adding a configuration parameter to SecurityInfo interface.
     (mahadev)
 
+    HADOOP-7298. Add test utility for writing multi-threaded tests. (todd and
+    Harsh J Chouraria via todd)
+
   OPTIMIZATIONS
   
     HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole

Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java?rev=1150910&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java
(added)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/MultithreadedTestUtil.java
Mon Jul 25 21:10:24 2011
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A utility to easily test threaded/synchronized code.
+ * Utility works by letting you add threads that do some work to a
+ * test context object, and then lets you kick them all off to stress test
+ * your parallel code.
+ *
+ * Also propagates thread exceptions back to the runner, to let you verify.
+ *
+ * An example:
+ *
+ * <code>
+ *  final AtomicInteger threadsRun = new AtomicInteger();
+ *
+ *  TestContext ctx = new TestContext();
+ *  // Add 3 threads to test.
+ *  for (int i = 0; i < 3; i++) {
+ *    ctx.addThread(new TestingThread(ctx) {
+ *      @Override
+ *      public void doWork() throws Exception {
+ *        threadsRun.incrementAndGet();
+ *      }
+ *    });
+ *  }
+ *  ctx.startThreads();
+ *  // Set a timeout period for threads to complete.
+ *  ctx.waitFor(30000);
+ *  assertEquals(3, threadsRun.get());
+ * </code>
+ *
+ * For repetitive actions, use the {@link MultithreadedTestUtil.RepeatingThread}
+ * instead.
+ *
+ * (More examples can be found in {@link TestMultithreadedTestUtil})
+ */
+public abstract class MultithreadedTestUtil {
+
+  public static final Log LOG =
+    LogFactory.getLog(MultithreadedTestUtil.class);
+
+  /**
+   * TestContext is used to setup the multithreaded test runner.
+   * It lets you add threads, run them, wait upon or stop them.
+   */
+  public static class TestContext {
+    private Throwable err = null;
+    private boolean stopped = false;
+    private Set<TestingThread> testThreads = new HashSet<TestingThread>();
+    private Set<TestingThread> finishedThreads = new HashSet<TestingThread>();
+
+    /**
+     * Check if the context can run threads.
+     * Can't if its been stopped and contains an error.
+     * @return true if it can run, false if it can't.
+     */
+    public synchronized boolean shouldRun()  {
+      return !stopped && err == null;
+    }
+
+    /**
+     * Add a thread to the context for running.
+     * Threads can be of type {@link MultithreadedTestUtil.TestingThread}
+     * or {@link MultithreadedTestUtil.RepeatingTestThread}
+     * or other custom derivatives of the former.
+     * @param t the thread to add for running.
+     */
+    public void addThread(TestingThread t) {
+      testThreads.add(t);
+    }
+
+    /**
+     * Starts all test threads that have been added so far.
+     */
+    public void startThreads() {
+      for (TestingThread t : testThreads) {
+        t.start();
+      }
+    }
+
+    /**
+     * Waits for threads to finish or error out.
+     * @param millis the number of milliseconds to wait
+     * for threads to complete.
+     * @throws Exception if one or more of the threads
+     * have thrown up an error.
+     */
+    public synchronized void waitFor(long millis) throws Exception {
+      long endTime = System.currentTimeMillis() + millis;
+      while (shouldRun() &&
+             finishedThreads.size() < testThreads.size()) {
+        long left = endTime - System.currentTimeMillis();
+        if (left <= 0) break;
+        checkException();
+        wait(left);
+      }
+      checkException();
+    }
+
+    /**
+     * Checks for thread exceptions, and if they've occurred
+     * throws them as RuntimeExceptions in a deferred manner.
+     */
+    private synchronized void checkException() throws Exception {
+      if (err != null) {
+        throw new RuntimeException("Deferred", err);
+      }
+    }
+
+    /**
+     * Called by {@link MultithreadedTestUtil.TestingThread}s to signal
+     * a failed thread.
+     * @param t the thread that failed.
+     */
+    public synchronized void threadFailed(Throwable t) {
+      if (err == null) err = t;
+      LOG.error("Failed!", err);
+      notify();
+    }
+
+    /**
+     * Called by {@link MultithreadedTestUtil.TestingThread}s to signal
+     * a successful completion.
+     * @param t the thread that finished.
+     */
+    public synchronized void threadDone(TestingThread t) {
+      finishedThreads.add(t);
+      notify();
+    }
+
+    /**
+     * Returns after stopping all threads by joining them back.
+     * @throws Exception in case a thread terminated with a failure.
+     */
+    public void stop() throws Exception {
+      synchronized (this) {
+        stopped = true;
+      }
+      for (TestingThread t : testThreads) {
+        t.join();
+      }
+      checkException();
+    }
+  }
+
+  /**
+   * A thread that can be added to a test context, and properly
+   * passes exceptions through.
+   */
+  public static abstract class TestingThread extends Thread {
+    protected final TestContext ctx;
+    protected boolean stopped;
+
+    public TestingThread(TestContext ctx) {
+      this.ctx = ctx;
+    }
+
+    public void run() {
+      try {
+        doWork();
+      } catch (Throwable t) {
+        ctx.threadFailed(t);
+      }
+      ctx.threadDone(this);
+    }
+
+    /**
+     * User method to add any code to test thread behavior of.
+     * @throws Exception throw an exception if a failure has occurred.
+     */
+    public abstract void doWork() throws Exception;
+
+    protected void stopTestThread() {
+      this.stopped = true;
+    }
+  }
+
+  /**
+   * A test thread that performs a repeating operation.
+   */
+  public static abstract class RepeatingTestThread extends TestingThread {
+    public RepeatingTestThread(TestContext ctx) {
+      super(ctx);
+    }
+
+    /**
+     * Repeats a given user action until the context is asked to stop
+     * or meets an error.
+     */
+    public final void doWork() throws Exception {
+      while (ctx.shouldRun() && !stopped) {
+        doAnAction();
+      }
+    }
+
+    /**
+     * User method for any code to test repeating behavior of (as threads).
+     * @throws Exception throw an exception if a failure has occured.
+     */
+    public abstract void doAnAction() throws Exception;
+  }
+}

Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java?rev=1150910&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java
(added)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/test/TestMultithreadedTestUtil.java
Mon Jul 25 21:10:24 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
+
+public class TestMultithreadedTestUtil {
+
+  private static final String FAIL_MSG =
+    "Inner thread fails an assert";
+
+  @Test
+  public void testNoErrors() throws Exception {
+    final AtomicInteger threadsRun = new AtomicInteger();
+
+    TestContext ctx = new TestContext();
+    for (int i = 0; i < 3; i++) {
+      ctx.addThread(new TestingThread(ctx) {
+        @Override
+        public void doWork() throws Exception {
+          threadsRun.incrementAndGet();
+        }
+      });
+    }
+    assertEquals(0, threadsRun.get());
+    ctx.startThreads();
+    long st = System.currentTimeMillis();
+    ctx.waitFor(30000);
+    long et = System.currentTimeMillis();
+
+    // All threads should have run
+    assertEquals(3, threadsRun.get());
+    // Test shouldn't have waited the full 30 seconds, since
+    // the threads exited faster than that.
+    assertTrue("Test took " + (et - st) + "ms",
+        et - st < 5000);
+  }
+
+  @Test
+  public void testThreadFails() throws Exception {
+    TestContext ctx = new TestContext();
+    ctx.addThread(new TestingThread(ctx) {
+      @Override
+      public void doWork() throws Exception {
+        fail(FAIL_MSG);
+      }
+    });
+    ctx.startThreads();
+    long st = System.currentTimeMillis();
+    try {
+      ctx.waitFor(30000);
+      fail("waitFor did not throw");
+    } catch (RuntimeException rte) {
+      // expected
+      assertEquals(FAIL_MSG, rte.getCause().getMessage());
+    }
+    long et = System.currentTimeMillis();
+    // Test shouldn't have waited the full 30 seconds, since
+    // the thread throws faster than that
+    assertTrue("Test took " + (et - st) + "ms",
+        et - st < 5000);
+  }
+
+  @Test
+  public void testThreadThrowsCheckedException() throws Exception {
+    TestContext ctx = new TestContext();
+    ctx.addThread(new TestingThread(ctx) {
+      @Override
+      public void doWork() throws Exception {
+        throw new IOException("my ioe");
+      }
+    });
+    ctx.startThreads();
+    long st = System.currentTimeMillis();
+    try {
+      ctx.waitFor(30000);
+      fail("waitFor did not throw");
+    } catch (RuntimeException rte) {
+      // expected
+      assertEquals("my ioe", rte.getCause().getMessage());
+    }
+    long et = System.currentTimeMillis();
+    // Test shouldn't have waited the full 30 seconds, since
+    // the thread throws faster than that
+    assertTrue("Test took " + (et - st) + "ms",
+        et - st < 5000);
+  }
+
+  @Test
+  public void testRepeatingThread() throws Exception {
+    final AtomicInteger counter = new AtomicInteger();
+
+    TestContext ctx = new TestContext();
+    ctx.addThread(new RepeatingTestThread(ctx) {
+      @Override
+      public void doAnAction() throws Exception {
+        counter.incrementAndGet();
+      }
+    });
+    ctx.startThreads();
+    long st = System.currentTimeMillis();
+    ctx.waitFor(3000);
+    ctx.stop();
+    long et = System.currentTimeMillis();
+    long elapsed = et - st;
+
+    // Test should have waited just about 3 seconds
+    assertTrue("Test took " + (et - st) + "ms",
+        Math.abs(elapsed - 3000) < 500);
+    // Counter should have been incremented lots of times in 3 full seconds
+    assertTrue("Counter value = " + counter.get(),
+        counter.get() > 1000);
+  }
+
+}



Mime
View raw message