flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1335364 - in /incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src: main/java/org/apache/flume/channel/recoverable/memory/wal/ test/java/org/apache/flume/channel/recoverable/memory/
Date Tue, 08 May 2012 07:29:52 GMT
Author: arvind
Date: Tue May  8 07:29:52 2012
New Revision: 1335364

URL: http://svn.apache.org/viewvc?rev=1335364&view=rev
Log:
FLUME-1188. TestRecoverableMemoryChannel.testThreaded can fail sometimes.

(Brock Noland via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java

Modified: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java?rev=1335364&r1=1335363&r2=1335364&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
Tue May  8 07:29:52 2012
@@ -147,7 +147,7 @@ public class WAL<T extends Writable> imp
     buffer.append("rollSize = ").append(rollSize).append(", ");
     buffer.append("maxLogsSize = ").append(maxLogsSize).append(", ");
     buffer.append("minLogRentionPeriod = ").append(minLogRentionPeriod).append(", ");
-    buffer.append("workerInterval = ").append(workerInterval).append("\n");
+    buffer.append("workerInterval = ").append(workerInterval);
     LOG.info("WAL Parameters: " + buffer);
 
     File clazzNamePath = new File(path, "clazz");

Modified: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java?rev=1335364&r1=1335363&r2=1335364&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
Tue May  8 07:29:52 2012
@@ -52,7 +52,6 @@ import com.google.common.io.Files;
 
 public class TestRecoverableMemoryChannel {
 
-  @SuppressWarnings("unused")
   private static final Logger logger = LoggerFactory
   .getLogger(TestRecoverableMemoryChannel.class);
 
@@ -204,12 +203,13 @@ public class TestRecoverableMemoryChanne
     putEvents(channel, "unbatched", 1, 5);
     putEvents(channel, "batched", 5, 5);
   }
-
   @Test
   public void testThreaded() throws IOException, InterruptedException {
     int numThreads = 10;
-    final CountDownLatch startLatch = new CountDownLatch(numThreads * 2);
-    final CountDownLatch stopLatch = new CountDownLatch(numThreads * 2);
+    final CountDownLatch producerStopLatch = new CountDownLatch(numThreads);
+    // due to limited capacity we must wait for consumers to start to put
+    final CountDownLatch consumerStartLatch = new CountDownLatch(numThreads);
+    final CountDownLatch consumerStopLatch = new CountDownLatch(numThreads);
     final List<Exception> errors = Collections
         .synchronizedList(new ArrayList<Exception>());
     final List<String> expected = Collections
@@ -222,17 +222,18 @@ public class TestRecoverableMemoryChanne
         @Override
         public void run() {
           try {
-            startLatch.countDown();
-            startLatch.await();
+            consumerStartLatch.await();
             if (id % 2 == 0) {
               expected.addAll(putEvents(channel, Integer.toString(id), 1, 5));
             } else {
               expected.addAll(putEvents(channel, Integer.toString(id), 5, 5));
             }
+            logger.info("Completed some puts " + expected.size());
           } catch (Exception e) {
+            logger.error("Error doing puts", e);
             errors.add(e);
           } finally {
-            stopLatch.countDown();
+            producerStopLatch.countDown();
           }
         }
       };
@@ -245,24 +246,36 @@ public class TestRecoverableMemoryChanne
         @Override
         public void run() {
           try {
-            startLatch.countDown();
-            startLatch.await();
-            if (id % 2 == 0) {
-              actual.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
+            consumerStartLatch.countDown();
+            consumerStartLatch.await();
+            while(!producerStopLatch.await(1, TimeUnit.SECONDS) ||
+                expected.size() > actual.size()) {
+              if (id % 2 == 0) {
+                actual.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
+              } else {
+                actual.addAll(takeEvents(channel, 5, Integer.MAX_VALUE));
+              }
+            }
+            if(actual.isEmpty()) {
+              logger.error("Found nothing!");
             } else {
-              actual.addAll(takeEvents(channel, 5, Integer.MAX_VALUE));
+              logger.info("Completed some takes " + actual.size());
             }
           } catch (Exception e) {
+            logger.error("Error doing takes", e);
             errors.add(e);
           } finally {
-            stopLatch.countDown();
+            consumerStopLatch.countDown();
           }
         }
       };
       t.setDaemon(true);
       t.start();
     }
-    Assert.assertTrue(stopLatch.await(5, TimeUnit.SECONDS));
+    Assert.assertTrue("Timed out waiting for producers",
+        producerStopLatch.await(30, TimeUnit.SECONDS));
+    Assert.assertTrue("Timed out waiting for consumer",
+        consumerStopLatch.await(30, TimeUnit.SECONDS));
     Assert.assertEquals(Collections.EMPTY_LIST, errors);
     Collections.sort(expected);
     Collections.sort(actual);



Mime
View raw message