flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1980. Log4jAppender should optionally drop events if append fails.
Date Fri, 10 May 2013 23:03:32 GMT
Updated Branches:
  refs/heads/trunk 6b9103712 -> 400403267


FLUME-1980. Log4jAppender should optionally drop events if append fails.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/40040326
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/40040326
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/40040326

Branch: refs/heads/trunk
Commit: 40040326763cdc655ea2707d8cb90a46791192c8
Parents: 6b91037
Author: Mike Percy <mpercy@apache.org>
Authored: Fri May 10 16:02:56 2013 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Fri May 10 16:02:56 2013 -0700

----------------------------------------------------------------------
 .../log4jappender/LoadBalancingLog4jAppender.java  |   32 ++++++-
 .../flume/clients/log4jappender/Log4jAppender.java |   65 +++++++++++---
 .../TestLoadBalancingLog4jAppender.java            |   63 +++++++++++++-
 .../clients/log4jappender/TestLog4jAppender.java   |   44 ++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    3 +
 5 files changed, 184 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
index 9fb115e..3172e21 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java
@@ -26,6 +26,7 @@ import org.apache.flume.api.RpcClientConfigurationConstants;
 import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.api.RpcClientFactory.ClientType;
 import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
 
 /**
  *
@@ -82,6 +83,7 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
   private String hosts;
   private String selector;
   private String maxBackoff;
+  private boolean configured = false;
 
   public void setHosts(String hostNames) {
     this.hosts = hostNames;
@@ -95,6 +97,20 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
     this.maxBackoff = maxBackoff;
   }
 
+  @Override
+  public synchronized void append(LoggingEvent event) {
+    if(!configured) {
+      String errorMsg = "Flume Log4jAppender not configured correctly! Cannot" +
+        " send events to Flume.";
+      LogLog.error(errorMsg);
+      if(getUnsafeMode()) {
+        return;
+      }
+      throw new FlumeException(errorMsg);
+    }
+    super.append(event);
+  }
+
   /**
    * Activate the options set using <tt>setHosts()</tt>, <tt>setSelector</tt>
    * and <tt>setMaxBackoff</tt>
@@ -107,18 +123,26 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
     try {
       final Properties properties = getProperties(hosts, selector, maxBackoff);
       rpcClient = RpcClientFactory.getInstance(properties);
-    } catch (FlumeException e) {
+      if(layout != null) {
+        layout.activateOptions();
+      }
+      configured = true;
+    } catch (Exception e) {
       String errormsg = "RPC client creation failed! " + e.getMessage();
       LogLog.error(errormsg);
-      throw e;
+      if (getUnsafeMode()) {
+        return;
+      }
+      throw new FlumeException(e);
     }
+
   }
 
   private Properties getProperties(String hosts, String selector,
       String maxBackoff) throws FlumeException {
 
     if (StringUtils.isEmpty(hosts)) {
-      throw new IllegalArgumentException("hosts must not be null");
+      throw new FlumeException("hosts must not be null");
     }
 
     Properties props = new Properties();
@@ -141,7 +165,7 @@ public class LoadBalancingLog4jAppender extends Log4jAppender {
     if (!StringUtils.isEmpty(maxBackoff)) {
       long millis = Long.parseLong(maxBackoff.trim());
       if (millis <= 0) {
-        throw new IllegalArgumentException(
+        throw new FlumeException(
             "Misconfigured max backoff, value must be greater than 0");
       }
       props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF,

http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
index d61f807..0ba56d3 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
@@ -30,7 +30,6 @@ import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.event.EventBuilder;
 
 import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Layout;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
 
@@ -63,6 +62,7 @@ public class Log4jAppender extends AppenderSkeleton {
 
   private String hostname;
   private int port;
+  private boolean unsafeMode = false;
 
   RpcClient rpcClient = null;
 
@@ -101,9 +101,14 @@ public class Log4jAppender extends AppenderSkeleton {
     //setup by setting hostname and port and then calling activateOptions
     //or this appender object was closed by calling close(), so we throw an
     //exception to show the appender is no longer accessible.
-    if(rpcClient == null){
-      throw new FlumeException("Cannot Append to Appender!" +
-          "Appender either closed or not setup correctly!");
+    if (rpcClient == null) {
+      String errorMsg = "Cannot Append to Appender! Appender either closed or" +
+        " not setup correctly!";
+      LogLog.error(errorMsg);
+      if (unsafeMode) {
+        return;
+      }
+      throw new FlumeException(errorMsg);
     }
 
     if(!rpcClient.isActive()){
@@ -138,6 +143,9 @@ public class Log4jAppender extends AppenderSkeleton {
     } catch (EventDeliveryException e) {
       String msg = "Flume append() failed.";
       LogLog.error(msg);
+      if (unsafeMode) {
+        return;
+      }
       throw new FlumeException(msg + " Exception follows.", e);
     }
   }
@@ -152,11 +160,27 @@ public class Log4jAppender extends AppenderSkeleton {
    * @throws FlumeException if errors occur during close
    */
   @Override
-  public synchronized void close() throws FlumeException{
-    //Any append calls after this will result in an Exception.
+  public synchronized void close() throws FlumeException {
+    // Any append calls after this will result in an Exception.
     if (rpcClient != null) {
-      rpcClient.close();
-      rpcClient = null;
+      try {
+        rpcClient.close();
+      } catch (FlumeException ex) {
+        LogLog.error("Error while trying to close RpcClient.", ex);
+        if (unsafeMode) {
+          return;
+        }
+        throw ex;
+      } finally {
+        rpcClient = null;
+      }
+    } else {
+      String errorMsg = "Flume log4jappender already closed!";
+      LogLog.error(errorMsg);
+      if(unsafeMode) {
+        return;
+      }
+      throw new FlumeException(errorMsg);
     }
   }
 
@@ -184,25 +208,38 @@ public class Log4jAppender extends AppenderSkeleton {
   public void setPort(int port){
     this.port = port;
   }
+
+  public void setUnsafeMode(boolean unsafeMode) {
+    this.unsafeMode = unsafeMode;
+  }
+
+  public boolean getUnsafeMode() {
+    return unsafeMode;
+  }
+
   /**
    * Activate the options set using <tt>setPort()</tt>
    * and <tt>setHostname()</tt>
+   *
    * @throws FlumeException if the <tt>hostname</tt> and
-   *  <tt>port</tt> combination is invalid.
+   *                        <tt>port</tt> combination is invalid.
    */
   @Override
-  public void activateOptions() throws FlumeException{
+  public void activateOptions() throws FlumeException {
     try {
       rpcClient = RpcClientFactory.getDefaultInstance(hostname, port);
+      if (layout != null) {
+        layout.activateOptions();
+      }
     } catch (FlumeException e) {
       String errormsg = "RPC client creation failed! " +
-          e.getMessage();
+        e.getMessage();
       LogLog.error(errormsg);
+      if (unsafeMode) {
+        return;
+      }
       throw e;
     }
-    if(layout != null) {
-      layout.activateOptions();
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
index 657af67..103bcb6 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java
@@ -56,7 +56,7 @@ import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
-public class TestLoadBalancingLog4jAppender{
+public class TestLoadBalancingLog4jAppender {
 
   private final List<CountingAvroSource> sources = Lists.newArrayList();
   private Channel ch;
@@ -89,7 +89,7 @@ public class TestLoadBalancingLog4jAppender{
     File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
         .getClassLoader()
         .getResource("flume-loadbalancinglog4jtest.properties").getFile());
-    startSources(TESTFILE, new int[] { 25430, 25431 });
+    startSources(TESTFILE, false, new int[] { 25430, 25431 });
 
     sendAndAssertMessages(numberOfMsgs);
 
@@ -104,7 +104,8 @@ public class TestLoadBalancingLog4jAppender{
     File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
         .getClassLoader()
         .getResource("flume-loadbalancing-rnd-log4jtest.properties").getFile());
-    startSources(TESTFILE, new int[] { 25430, 25431, 25432, 25433, 25434,
+    startSources(TESTFILE, false, new int[] { 25430, 25431, 25432, 25433,
+                                              25434,
         25435, 25436, 25437, 25438, 25439 });
 
     sendAndAssertMessages(numberOfMsgs);
@@ -126,7 +127,7 @@ public class TestLoadBalancingLog4jAppender{
         .getClassLoader()
         .getResource("flume-loadbalancing-backoff-log4jtest.properties")
         .getFile());
-    startSources(TESTFILE, new int[] { 25430, 25431, 25432 });
+    startSources(TESTFILE, false, new int[] { 25430, 25431, 25432 });
 
     sources.get(0).setFail();
     sources.get(2).setFail();
@@ -154,6 +155,39 @@ public class TestLoadBalancingLog4jAppender{
     Assert.assertEquals(0, sources.get(2).appendCount.intValue());
   }
 
+  @Test
+  public void testRandomBackoffUnsafeMode() throws Exception {
+    File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
+      .getClassLoader()
+      .getResource("flume-loadbalancing-backoff-log4jtest.properties")
+      .getFile());
+    startSources(TESTFILE, true, new int[]{25430, 25431, 25432});
+
+    sources.get(0).setFail();
+    sources.get(1).setFail();
+    sources.get(2).setFail();
+    sendAndAssertFail();
+
+  }
+
+  @Test(expected = EventDeliveryException.class)
+  public void testRandomBackoffNotUnsafeMode() throws Throwable {
+    File TESTFILE = new File(TestLoadBalancingLog4jAppender.class
+      .getClassLoader()
+      .getResource("flume-loadbalancing-backoff-log4jtest.properties")
+      .getFile());
+    startSources(TESTFILE, false, new int[]{25430, 25431, 25432});
+
+    sources.get(0).setFail();
+    sources.get(1).setFail();
+    sources.get(2).setFail();
+    try {
+      sendAndAssertFail();
+    } catch (FlumeException ex) {
+      throw ex.getCause();
+    }
+  }
+
   private void send(int numberOfMsgs) throws EventDeliveryException {
     for (int count = 0; count < numberOfMsgs; count++) {
       int level = count % 5;
@@ -162,6 +196,21 @@ public class TestLoadBalancingLog4jAppender{
     }
   }
 
+  private void sendAndAssertFail() throws IOException {
+      int level = 20000;
+      String msg = "This is log message number" + String.valueOf(level);
+      fixture.log(Level.toLevel(level), msg);
+
+      Transaction transaction = ch.getTransaction();
+      transaction.begin();
+      Event event = ch.take();
+      Assert.assertNull(event);
+
+      transaction.commit();
+      transaction.close();
+
+  }
+
   private void sendAndAssertMessages(int numberOfMsgs) throws IOException {
     for (int count = 0; count < numberOfMsgs; count++) {
       int level = count % 5;
@@ -194,7 +243,9 @@ public class TestLoadBalancingLog4jAppender{
 
   }
 
-  private void startSources(File log4jProps, int... ports) throws IOException {
+  private void startSources(File log4jProps, boolean unsafeMode, int... ports)
+    throws
+    IOException {
     for (int port : ports) {
       CountingAvroSource source = new CountingAvroSource(port);
       Context context = new Context();
@@ -218,6 +269,8 @@ public class TestLoadBalancingLog4jAppender{
     FileReader reader = new FileReader(log4jProps);
     Properties props = new Properties();
     props.load(reader);
+    props.setProperty("log4j.appender.out2.UnsafeMode",
+      String.valueOf(unsafeMode));
     PropertyConfigurator.configure(props);
     fixture = LogManager.getLogger(TestLoadBalancingLog4jAppender.class);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
index de88730..211837b 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
@@ -29,6 +29,8 @@ import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
@@ -118,6 +120,48 @@ public class TestLog4jAppender{
   }
 
   @Test
+  public void testLog4jAppenderFailureUnsafeMode() throws Throwable {
+    props.setProperty("log4j.appender.out2.UnsafeMode", String.valueOf(true));
+    PropertyConfigurator.configure(props);
+    Logger logger = LogManager.getLogger(TestLog4jAppender.class);
+    source.stop();
+    sendAndAssertFail(logger);
+
+  }
+
+  @Test(expected = EventDeliveryException.class)
+  public void testLog4jAppenderFailureNotUnsafeMode() throws Throwable {
+    PropertyConfigurator.configure(props);
+    Logger logger = LogManager.getLogger(TestLog4jAppender.class);
+    source.stop();
+    sendAndAssertFail(logger);
+
+  }
+
+  private void sendAndAssertFail(Logger logger) throws Throwable {
+      /*
+       * Log4j internally defines levels as multiples of 10000. So if we
+       * create levels directly using count, the level will be set as the
+       * default.
+       */
+    int level = 20000;
+    try {
+      logger.log(Level.toLevel(level), "Test Msg");
+    } catch (FlumeException ex) {
+      ex.printStackTrace();
+      throw ex.getCause();
+    }
+    Transaction transaction = ch.getTransaction();
+    transaction.begin();
+    Event event = ch.take();
+    Assert.assertNull(event);
+    transaction.commit();
+    transaction.close();
+
+  }
+
+
+  @Test
   public void testLayout() throws IOException {
     props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout");
     props.put("log4j.appender.out2.layout.ConversionPattern",

http://git-wip-us.apache.org/repos/asf/flume/blob/40040326/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index d129abf..2ee41be 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2601,6 +2601,7 @@ Property Name  Default  Description
 =============  =======  ==========================================================================
 Hostname       --       The hostname on which a remote Flume agent is running with an avro
source.
 Port           --       The port at which the remote Flume agent's avro source is listening.
+UnsafeMode     false    If true, the appender will not throw exceptions on failure to send
the events.
 =============  =======  ==========================================================================
 
 
@@ -2612,6 +2613,7 @@ Sample log4j.properties file:
   log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
   log4j.appender.flume.Hostname = example.com
   log4j.appender.flume.Port = 41414
+  log4j.appender.flume.UnsafeMode = true
 
   # configure a class's logger to output to the flume appender
   log4j.logger.org.example.MyClass = DEBUG,flume
@@ -2637,6 +2639,7 @@ Selector       ROUND_ROBIN  Selection mechanism. Must be either ROUND_ROBIN,
 MaxBackoff     --           A long value representing the maximum amount of time in milliseconds
                             the Load balancing client will backoff from a node that has failed
to
                             consume an event. Defaults to no backoff
+UnsafeMode     false        If true, the appender will not throw exceptions on failure to
send the events.
 =============  ===========  ==========================================================================
 
 


Mime
View raw message