flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [3/9] git commit: FLUME-1508. TestFileChannel times out maven periodically.
Date Fri, 24 Aug 2012 17:23:23 GMT
FLUME-1508. TestFileChannel times out maven periodically.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e46cf805
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e46cf805
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e46cf805

Branch: refs/heads/flume-1.3.0
Commit: e46cf8050acf1624d939d6029326ffae9e9d430a
Parents: d80a7d6
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Aug 23 11:26:42 2012 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Aug 24 10:21:42 2012 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/file/TestFileChannel.java |   67 +-------
 .../apache/flume/channel/file/TestIntegration.java |  137 +++++++++++++++
 2 files changed, 141 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e46cf805/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index bca2b17..2093839 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -17,8 +17,8 @@
  * under the License.
  */
 package org.apache.flume.channel.file;
-import static org.fest.reflect.core.Reflection.field;
-import static org.fest.reflect.core.Reflection.method;
+import static org.fest.reflect.core.Reflection.*;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -29,9 +29,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -46,8 +48,6 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.LoggerSink;
-import org.apache.flume.sink.NullSink;
-import org.apache.flume.source.SequenceGeneratorSource;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -63,8 +63,6 @@ import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
 
 public class TestFileChannel {
 
@@ -659,63 +657,6 @@ public class TestFileChannel {
     fileChannel.start();
     Assert.assertTrue(!fileChannel.isOpen());
   }
-  @Test
-  public void testIntegration() throws IOException, InterruptedException {
-    // set shorter checkpoint and filesize to ensure
-    // checkpoints and rolls occur during the test
-    Map<String, String> overrides = Maps.newHashMap();
-    overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL,
-            String.valueOf(10L * 1000L));
-    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE,
-            String.valueOf(1024 * 1024 * 5));
-    // do reconfiguration
-    channel = createFileChannel(overrides);
-    channel.start();
-    Assert.assertTrue(channel.isOpen());
-
-    SequenceGeneratorSource source = new SequenceGeneratorSource();
-    CountingSourceRunner sourceRunner = new CountingSourceRunner(source, channel);
-
-    NullSink sink = new NullSink();
-    sink.setChannel(channel);
-    CountingSinkRunner sinkRunner = new CountingSinkRunner(sink);
-
-    sinkRunner.start();
-    sourceRunner.start();
-    Thread.sleep(TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES));
-    // shutdown source
-    sourceRunner.shutdown();
-    while(sourceRunner.isAlive()) {
-      Thread.sleep(10L);
-    }
-    // wait for queue to clear
-    while(channel.getDepth() > 0) {
-      Thread.sleep(10L);
-    }
-    // shutdown size
-    sinkRunner.shutdown();
-    // wait a few seconds
-    Thread.sleep(TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS));
-    List<File> logs = Lists.newArrayList();
-    for (int i = 0; i < dataDirs.length; i++) {
-      logs.addAll(LogUtils.getLogs(dataDirs[i]));
-    }
-    LOG.info("Total Number of Logs = " + logs.size());
-    for(File logFile : logs) {
-      LOG.info("LogFile = " + logFile);
-    }
-    LOG.info("Source processed " + sinkRunner.getCount());
-    LOG.info("Sink processed " + sourceRunner.getCount());
-    for(Exception ex : sourceRunner.getErrors()) {
-      LOG.warn("Source had error", ex);
-    }
-    for(Exception ex : sinkRunner.getErrors()) {
-      LOG.warn("Sink had error", ex);
-    }
-    Assert.assertEquals(sinkRunner.getCount(), sinkRunner.getCount());
-    Assert.assertEquals(Collections.EMPTY_LIST, sinkRunner.getErrors());
-    Assert.assertEquals(Collections.EMPTY_LIST, sourceRunner.getErrors());
-  }
   /**
    * This is regression test with files generated by a file channel
    * with the FLUME-1432 patch.

http://git-wip-us.apache.org/repos/asf/flume/blob/e46cf805/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
new file mode 100644
index 0000000..4e2f940
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class TestIntegration {
+
+  private static final Logger LOG = LoggerFactory
+          .getLogger(TestIntegration.class);
+  private FileChannel channel;
+  private File baseDir;
+  private File checkpointDir;
+  private File[] dataDirs;
+  private String dataDir;
+
+  @Before
+  public void setup() {
+    baseDir = Files.createTempDir();
+    checkpointDir = new File(baseDir, "chkpt");
+    Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory());
+    dataDirs = new File[3];
+    dataDir = "";
+    for (int i = 0; i < dataDirs.length; i++) {
+      dataDirs[i] = new File(baseDir, "data" + (i+1));
+      Assert.assertTrue(dataDirs[i].mkdirs() || dataDirs[i].isDirectory());
+      dataDir += dataDirs[i].getAbsolutePath() + ",";
+    }
+    dataDir = dataDir.substring(0, dataDir.length() - 1);
+  }
+  @After
+  public void teardown() {
+    if(channel != null && channel.isOpen()) {
+      channel.stop();
+    }
+    FileUtils.deleteQuietly(baseDir);
+  }
+  @Test
+  public void testIntegration() throws IOException, InterruptedException {
+    // set shorter checkpoint and filesize to ensure
+    // checkpoints and rolls occur during the test
+    Context context = new Context();
+    context.put(FileChannelConfiguration.CHECKPOINT_DIR,
+        checkpointDir.getAbsolutePath());
+    context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
+    context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
+    // Set checkpoint for 5 seconds otherwise test will run out of memory
+    context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "5000");
+    context.put(FileChannelConfiguration.MAX_FILE_SIZE,
+            String.valueOf(1024 * 1024 * 5));
+    // do reconfiguration
+    channel = new FileChannel();
+    channel.setName("FileChannel-" + UUID.randomUUID());
+    Configurables.configure(channel, context);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+
+    SequenceGeneratorSource source = new SequenceGeneratorSource();
+    CountingSourceRunner sourceRunner = new CountingSourceRunner(source, channel);
+
+    NullSink sink = new NullSink();
+    sink.setChannel(channel);
+    CountingSinkRunner sinkRunner = new CountingSinkRunner(sink);
+
+    sinkRunner.start();
+    sourceRunner.start();
+    TimeUnit.SECONDS.sleep(30);
+    // shutdown source
+    sourceRunner.shutdown();
+    while(sourceRunner.isAlive()) {
+      Thread.sleep(10L);
+    }
+    // wait for queue to clear
+    while(channel.getDepth() > 0) {
+      Thread.sleep(10L);
+    }
+    // shutdown size
+    sinkRunner.shutdown();
+    // wait a few seconds
+    TimeUnit.SECONDS.sleep(5);
+    List<File> logs = Lists.newArrayList();
+    for (int i = 0; i < dataDirs.length; i++) {
+      logs.addAll(LogUtils.getLogs(dataDirs[i]));
+    }
+    LOG.info("Total Number of Logs = " + logs.size());
+    for(File logFile : logs) {
+      LOG.info("LogFile = " + logFile);
+    }
+    LOG.info("Source processed " + sinkRunner.getCount());
+    LOG.info("Sink processed " + sourceRunner.getCount());
+    for(Exception ex : sourceRunner.getErrors()) {
+      LOG.warn("Source had error", ex);
+    }
+    for(Exception ex : sinkRunner.getErrors()) {
+      LOG.warn("Sink had error", ex);
+    }
+    Assert.assertEquals(sinkRunner.getCount(), sinkRunner.getCount());
+    Assert.assertEquals(Collections.EMPTY_LIST, sinkRunner.getErrors());
+    Assert.assertEquals(Collections.EMPTY_LIST, sourceRunner.getErrors());
+  }
+}


Mime
View raw message