flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [10/16] git commit: FLUME-1571: Channels should check for positive capacity and transaction capacity values
Date Fri, 21 Dec 2012 23:50:25 GMT
FLUME-1571: Channels should check for positive capacity and transaction capacity values

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/09dfc2ab
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/09dfc2ab
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/09dfc2ab

Branch: refs/heads/flume-1.3.0
Commit: 09dfc2ab77ab898fd38094b3c8c2358f7fd55ffd
Parents: 16b9cb5
Author: Brock Noland <brock@apache.org>
Authored: Tue Dec 11 14:10:15 2012 -0600
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Dec 20 00:13:43 2012 -0800

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/FileChannel.java |   33 +++++++++++++--
 .../channel/file/TestCheckpointRebuilder.java      |    2 +
 .../apache/flume/channel/file/TestFileChannel.java |   30 +++++++++++++
 .../file/TestFileChannelFormatRegression.java      |    2 +
 .../flume/channel/file/TestFileChannelRestart.java |    1 +
 .../org/apache/flume/channel/file/TestUtils.java   |   16 ++++++-
 .../file/encryption/TestFileChannelEncryption.java |    6 +++
 .../org/apache/flume/channel/MemoryChannel.java    |   18 ++++++++-
 .../apache/flume/channel/TestMemoryChannel.java    |   22 ++++++++++
 9 files changed, 122 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 4bf480b..19c91b0 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -72,10 +72,10 @@ public class FileChannel extends BasicChannelSemantics {
   private static final Logger LOG = LoggerFactory
       .getLogger(FileChannel.class);
 
-  private int capacity;
+  private Integer capacity = 0;
   private int keepAlive;
-  private int transactionCapacity;
-  private long checkpointInterval;
+  private Integer transactionCapacity = 0;
+  private Long checkpointInterval = 0L;
   private long maxFileSize;
   private long minimumRequiredSpace;
   private File checkpointDir;
@@ -147,6 +147,11 @@ public class FileChannel extends BasicChannelSemantics {
 
     int newCapacity = context.getInteger(FileChannelConfiguration.CAPACITY,
         FileChannelConfiguration.DEFAULT_CAPACITY);
+    if(newCapacity <= 0 && capacity == 0) {
+      newCapacity = FileChannelConfiguration.DEFAULT_CAPACITY;
+      LOG.warn("Invalid capacity specified, initializing channel to "
+              + "default capacity of {}", newCapacity);
+    }
     if(capacity > 0 && newCapacity != capacity) {
       LOG.warn("Capacity of this channel cannot be sized on the fly due " +
           "the requirement we have enough DirectMemory for the queue and " +
@@ -163,9 +168,29 @@ public class FileChannel extends BasicChannelSemantics {
         context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY,
             FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY);
 
+    if(transactionCapacity <= 0) {
+      transactionCapacity =
+              FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY;
+      LOG.warn("Invalid transaction capacity specified, " +
+          "initializing channel to default " +
+          "capacity of {}", transactionCapacity);
+    }
+
+    Preconditions.checkState(transactionCapacity <= capacity,
+        "File Channel transaction capacity cannot be greater than the " +
+            "capacity of the channel.");
+
     checkpointInterval =
-        context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
+            context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
             FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);
+    if (checkpointInterval <= 0) {
+      LOG.warn("Checkpoint interval is invalid: " + checkpointInterval
+              + ", using default: "
+              + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);
+
+      checkpointInterval =
+              FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL;
+    }
 
     // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
     maxFileSize = Math.min(

http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
index ffc4623..536af54 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
@@ -52,6 +52,8 @@ public class TestCheckpointRebuilder extends TestFileChannelBase {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY,
         String.valueOf(50));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
+        String.valueOf(50));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());

http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 87a0a3f..0f7d14d 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -67,6 +67,28 @@ public class TestFileChannel extends TestFileChannelBase {
   public void teardown() {
     super.teardown();
   }
+
+  @Test
+  public void testNegativeCapacities() {
+    Map<String, String> parms = Maps.newHashMap();
+    parms.put(FileChannelConfiguration.CAPACITY, "-3");
+    parms.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "-1");
+    parms.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "-2");
+    FileChannel channel = createFileChannel(parms);
+
+    Assert.assertTrue(field("capacity")
+            .ofType(Integer.class)
+            .in(channel).get() > 0);
+
+    Assert.assertTrue(field("transactionCapacity")
+            .ofType(Integer.class)
+            .in(channel).get() > 0);
+
+    Assert.assertTrue(field("checkpointInterval")
+            .ofType(Long.class)
+            .in(channel).get() > 0);
+  }
+
   @Test
   public void testFailAfterTakeBeforeCommit() throws Throwable {
     final FileChannel channel = createFileChannel();
@@ -223,6 +245,8 @@ public class TestFileChannel extends TestFileChannelBase {
   public void testCapacity() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(5));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
+        String.valueOf(5));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -259,6 +283,8 @@ public class TestFileChannel extends TestFileChannelBase {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(10L));
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
+        String.valueOf(10));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -409,6 +435,8 @@ public class TestFileChannel extends TestFileChannelBase {
   public void testPutForceCheckpointCommitReplay() throws Exception{
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
+        String.valueOf(2));
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
     FileChannel channel = createFileChannel(overrides);
     channel.start();
@@ -433,6 +461,8 @@ public class TestFileChannel extends TestFileChannelBase {
   public void testPutCheckpointCommitCheckpointReplay() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
+        String.valueOf(2));
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
     FileChannel channel = createFileChannel(overrides);
     channel.start();

http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
index 184f956..c95122b 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
@@ -65,6 +65,8 @@ public class TestFileChannelFormatRegression extends TestFileChannelBase
{
     }
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
+        String.valueOf(10));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());

http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index f548f31..3d5bf59 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -371,6 +371,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
       throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, "10");
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());

http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 9978f86..ba653e6 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -142,7 +142,15 @@ public class TestUtils {
       try {
         transaction.begin();
         for (int j = 0; j < batchSize; j++) {
-          Event event = channel.take();
+          Event event = null;
+          try {
+            event = channel.take();
+          } catch (ChannelException ex) {
+            Assert.assertTrue(ex.getMessage().startsWith(
+                "Take list for FileBackedTransaction, capacity"));
+            transaction.commit();
+            return result;
+          }
           if (event == null) {
             transaction.commit();
             return result;
@@ -194,11 +202,13 @@ public class TestUtils {
           result.addAll(batch);
         }
       } catch (ChannelException e) {
-        Assert.assertEquals("The channel has reached it's capacity. This might "
+        Assert.assertTrue(("The channel has reached it's capacity. This might "
             + "be the result of a sink on the channel having too low of batch "
             + "size, a downstream system running slower than normal, or that "
             + "the channel capacity is just too low. [channel="
-            + channel.getName()+"]", e.getMessage());
+            + channel.getName() + "]").equals(e.getMessage())
+            || e.getMessage().startsWith("Put queue for FileBackedTransaction " +
+            "of capacity "));
       }
     }
     return result;

http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
index d2f5208..6ea1216 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
@@ -72,6 +72,8 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
   private Map<String, String> getOverrides() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(100));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
+        String.valueOf(100));
     return overrides;
   }
   private Map<String, String> getOverridesForEncryption() throws Exception {
@@ -98,6 +100,8 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
     int numThreads = 20;
     Map<String, String> overrides = getOverridesForEncryption();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
+        String.valueOf(100));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -133,6 +137,8 @@ public class TestFileChannelEncryption extends TestFileChannelBase {
     int numThreads = 20;
     Map<String, String> overrides = getOverridesForEncryption();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
+        String.valueOf(100));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());

http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
index dfc289e..a25e639 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
@@ -224,15 +224,31 @@ public class MemoryChannel extends BasicChannelSemantics {
       capacity = context.getInteger("capacity", defaultCapacity);
     } catch(NumberFormatException e) {
       capacity = defaultCapacity;
+      LOGGER.warn("Invalid capacity specified, initializing channel to "
+          + "default capacity of {}", defaultCapacity);
     }
 
+    if (capacity <= 0) {
+      capacity = defaultCapacity;
+      LOGGER.warn("Invalid capacity specified, initializing channel to "
+          + "default capacity of {}", defaultCapacity);
+    }
     try {
       transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
     } catch(NumberFormatException e) {
       transCapacity = defaultTransCapacity;
+      LOGGER.warn("Invalid transation capacity specified, initializing channel"
+          + " to default capacity of {}", defaultTransCapacity);
     }
 
-    Preconditions.checkState(transCapacity <= capacity);
+    if (transCapacity <= 0) {
+      transCapacity = defaultTransCapacity;
+      LOGGER.warn("Invalid transation capacity specified, initializing channel"
+          + " to default capacity of {}", defaultTransCapacity);
+    }
+    Preconditions.checkState(transCapacity <= capacity,
+        "Transaction Capacity of Memory Channel cannot be higher than " +
+            "the capacity.");
 
     try {
       byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage);

http://git-wip-us.apache.org/repos/asf/flume/blob/09dfc2ab/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
index e1a61c2..a78581a 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
@@ -21,6 +21,7 @@ package org.apache.flume.channel;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
@@ -33,6 +34,8 @@ import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import static org.fest.reflect.core.Reflection.*;
+
 
 public class TestMemoryChannel {
 
@@ -439,4 +442,23 @@ public class TestMemoryChannel {
 
 
   }
+
+  @Test
+  public void testNegativeCapacities() {
+    Context context = new Context();
+    Map<String, String> parms = new HashMap<String, String>();
+    parms.put("capacity", "-3");
+    parms.put("transactionCapacity", "-1");
+    context.putAll(parms);
+    Configurables.configure(channel, context);
+
+    Assert.assertTrue(field("queue")
+            .ofType(LinkedBlockingDeque.class)
+            .in(channel).get()
+            .remainingCapacity() > 0);
+
+    Assert.assertTrue(field("transCapacity")
+            .ofType(Integer.class)
+            .in(channel).get() > 0);
+  }
 }


Mime
View raw message