flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-2042. log4jappender timeout should be configurable
Date Sat, 11 May 2013 21:32:17 GMT
Updated Branches:
  refs/heads/trunk 51da4db4d -> e9719a889


FLUME-2042. log4jappender timeout should be configurable

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e9719a88
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e9719a88
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e9719a88

Branch: refs/heads/trunk
Commit: e9719a8897b7cce196aabbac84c207a1c906886f
Parents: 51da4db
Author: Mike Percy <mpercy@apache.org>
Authored: Sat May 11 14:30:48 2013 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Sat May 11 14:31:44 2013 -0700

----------------------------------------------------------------------
 .../log4jappender/LoadBalancingLog4jAppender.java  |    9 ++-
 .../flume/clients/log4jappender/Log4jAppender.java |   23 +++++-
 .../TestLoadBalancingLog4jAppender.java            |   30 +++++++
 .../clients/log4jappender/TestLog4jAppender.java   |   68 +++++++++++++--
 4 files changed, 120 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e9719a88/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
index 3172e21..713234f 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
@@ -121,7 +121,8 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
   @Override
   public void activateOptions() throws FlumeException {
     try {
-      final Properties properties = getProperties(hosts, selector, maxBackoff);
+      final Properties properties = getProperties(hosts, selector,
+        maxBackoff, getTimeout());
       rpcClient = RpcClientFactory.getInstance(properties);
       if(layout != null) {
         layout.activateOptions();
@@ -139,7 +140,7 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
   }
 
   private Properties getProperties(String hosts, String selector,
-      String maxBackoff) throws FlumeException {
+      String maxBackoff, long timeout) throws FlumeException {
 
     if (StringUtils.isEmpty(hosts)) {
       throw new FlumeException("hosts must not be null");
@@ -172,6 +173,10 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
           String.valueOf(true));
       props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, maxBackoff);
     }
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
+      String.valueOf(timeout));
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+      String.valueOf(timeout));
     return props;
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e9719a88/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
index 0ba56d3..532b761 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
@@ -21,11 +21,13 @@ package org.apache.flume.clients.log4jappender;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
 import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.event.EventBuilder;
 
@@ -63,6 +65,8 @@ public class Log4jAppender extends AppenderSkeleton {
   private String hostname;
   private int port;
   private boolean unsafeMode = false;
+  private long timeout = RpcClientConfigurationConstants
+    .DEFAULT_REQUEST_TIMEOUT_MILLIS;
 
   RpcClient rpcClient = null;
 
@@ -217,6 +221,15 @@ public class Log4jAppender extends AppenderSkeleton {
     return unsafeMode;
   }
 
+  public void setTimeout(long timeout) {
+    this.timeout = timeout;
+  }
+
+  public long getTimeout() {
+    return this.timeout;
+  }
+
+
   /**
    * Activate the options set using <tt>setPort()</tt>
    * and <tt>setHostname()</tt>
@@ -226,8 +239,16 @@ public class Log4jAppender extends AppenderSkeleton {
    */
   @Override
   public void activateOptions() throws FlumeException {
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
+      hostname + ":" + port);
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
+     String.valueOf(timeout));
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+      String.valueOf(timeout));
     try {
-      rpcClient = RpcClientFactory.getDefaultInstance(hostname, port);
+      rpcClient = RpcClientFactory.getInstance(props);
       if (layout != null) {
         layout.activateOptions();
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/e9719a88/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
index 103bcb6..267ac1d 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
@@ -62,10 +62,16 @@ public class TestLoadBalancingLog4jAppender {
   private Channel ch;
   private ChannelSelector rcs;
   private Logger fixture;
+  private boolean slowDown = false;
 
   @Before
   public void initiate() throws InterruptedException{
     ch = new MemoryChannel();
+    configureChannel();
+
+  }
+
+  private void configureChannel() {
     Configurables.configure(ch, new Context());
 
     List<Channel> channels = new ArrayList<Channel>();
@@ -170,6 +176,27 @@ public class TestLoadBalancingLog4jAppender {
 
   }
 
+  @Test (expected = EventDeliveryException.class)
+  public void testTimeout() throws Throwable {
+    File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
+      .getClassLoader()
+      .getResource("flume-loadbalancinglog4jtest.properties")
+      .getFile());
+
+    ch = new TestLog4jAppender.SlowMemoryChannel(2000);
+    configureChannel();
+    slowDown = true;
+    startSources(TESTFILE, false, new int[]{25430, 25431, 25432});
+    int level = 20000;
+    String msg = "This is log message number" + String.valueOf(level);
+    try {
+      fixture.log(Level.toLevel(level), msg);
+    } catch (FlumeException ex) {
+      throw ex.getCause();
+    }
+
+  }
+
   @Test(expected = EventDeliveryException.class)
   public void testRandomBackoffNotUnsafeMode() throws Throwable {
     File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
@@ -271,6 +298,9 @@ public class TestLoadBalancingLog4jAppender {
     props.load(reader);
     props.setProperty("log4j.appender.out2.UnsafeMode",
       String.valueOf(unsafeMode));
+    if(slowDown) {
+      props.setProperty("log4j.appender.out2.Timeout", String.valueOf(1000));
+    }
     PropertyConfigurator.configure(props);
     fixture = LogManager.getLogger(TestLoadBalancingLog4jAppender.class);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/e9719a88/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
index 211837b..1b840f3 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
@@ -62,6 +63,16 @@ public class TestLog4jAppender{
     context.put("bind", "localhost");
     Configurables.configure(source, context);
 
+    File TESTFILE = new File(
+        TestLog4jAppender.class.getClassLoader()
+            .getResource("flume-log4jtest.properties").getFile());
+    FileReader reader = new FileReader(TESTFILE);
+    props = new Properties();
+    props.load(reader);
+    reader.close();
+  }
+
+  private void configureSource() {
     List<Channel> channels = new ArrayList<Channel>();
     channels.add(ch);
 
@@ -71,16 +82,10 @@ public class TestLog4jAppender{
     source.setChannelProcessor(new ChannelProcessor(rcs));
 
     source.start();
-    File TESTFILE = new File(
-        TestLog4jAppender.class.getClassLoader()
-            .getResource("flume-log4jtest.properties").getFile());
-    FileReader reader = new FileReader(TESTFILE);
-    props = new Properties();
-    props.load(reader);
-    reader.close();
   }
   @Test
   public void testLog4jAppender() throws IOException {
+    configureSource();
     PropertyConfigurator.configure(props);
     Logger logger = LogManager.getLogger(TestLog4jAppender.class);
     for(int count = 0; count <= 1000; count++){
@@ -121,6 +126,7 @@ public class TestLog4jAppender{
 
   @Test
   public void testLog4jAppenderFailureUnsafeMode() throws Throwable {
+    configureSource();
     props.setProperty("log4j.appender.out2.UnsafeMode", String.valueOf(true));
     PropertyConfigurator.configure(props);
     Logger logger = LogManager.getLogger(TestLog4jAppender.class);
@@ -131,6 +137,7 @@ public class TestLog4jAppender{
 
   @Test(expected = EventDeliveryException.class)
   public void testLog4jAppenderFailureNotUnsafeMode() throws Throwable {
+    configureSource();
     PropertyConfigurator.configure(props);
     Logger logger = LogManager.getLogger(TestLog4jAppender.class);
     source.stop();
@@ -163,6 +170,7 @@ public class TestLog4jAppender{
 
   @Test
   public void testLayout() throws IOException {
+    configureSource();
     props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout");
     props.put("log4j.appender.out2.layout.ConversionPattern",
         "%-5p [%t]: %m%n");
@@ -214,6 +222,34 @@ public class TestLog4jAppender{
 
   }
 
+  @Test(expected = EventDeliveryException.class)
+  public void testSlowness() throws Throwable {
+    ch = new SlowMemoryChannel(2000);
+    Configurables.configure(ch, new Context());
+    configureSource();
+    props.put("log4j.appender.out2.Timeout", "1000");
+    props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout");
+    props.put("log4j.appender.out2.layout.ConversionPattern",
+      "%-5p [%t]: %m%n");
+    PropertyConfigurator.configure(props);
+    Logger logger = LogManager.getLogger(TestLog4jAppender.class);
+    Thread.currentThread().setName("Log4jAppenderTest");
+    int level = 10000;
+    String msg = "This is log message number" + String.valueOf(1);
+    try {
+      logger.log(Level.toLevel(level), msg);
+    } catch (FlumeException ex) {
+      throw ex.getCause();
+    }
+  }
+
+  @Test // Should not throw
+  public void testSlownessUnsafeMode() throws Throwable {
+    props.setProperty("log4j.appender.out2.UnsafeMode", String.valueOf(true));
+    testSlowness();
+  }
+
+
   @After
   public void cleanUp(){
     source.stop();
@@ -221,4 +257,22 @@ public class TestLog4jAppender{
     props.clear();
   }
 
+
+  static class SlowMemoryChannel extends MemoryChannel {
+    private final int slowTime;
+
+    public SlowMemoryChannel(int slowTime) {
+      this.slowTime = slowTime;
+    }
+
+    public void put(Event e) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(slowTime);
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+      super.put(e);
+    }
+  }
+
 }


Mime
View raw message