logging-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgo...@apache.org
Subject svn commit: r1499780 - in /logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender: FlumeAppender.java FlumeEvent.java FlumePersistentManager.java Log4jEventSource.java
Date Thu, 04 Jul 2013 15:12:44 GMT
Author: rgoers
Date: Thu Jul  4 15:12:43 2013
New Revision: 1499780

URL: http://svn.apache.org/r1499780
Log:
LOG4J2-279 - Move Berkeley DB I/O to its own thread pool to avoid interrupts. Disallow calls
from Avro or Flume to avoid circularity

Modified:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java?rev=1499780&r1=1499779&r2=1499780&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
Thu Jul  4 15:12:43 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.logging.log4j.flume.appender;
 
+import java.io.Serializable;
+import java.util.Locale;
+
 import org.apache.logging.log4j.core.Filter;
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LogEvent;
@@ -27,15 +30,15 @@ import org.apache.logging.log4j.core.con
 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
 import org.apache.logging.log4j.core.layout.RFC5424Layout;
 
-import java.io.Serializable;
-import java.util.Locale;
-
 /**
  * An Appender that uses the Avro protocol to route events to Flume.
+ * @param <T> The {@link Layout}'s {@link Serializable} type.
  */
 @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
 public final class FlumeAppender<T extends Serializable> extends AbstractAppender<T>
implements FlumeEventFactory {
 
+    private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
+
     private final AbstractFlumeManager manager;
 
     private final String mdcIncludes;
@@ -50,6 +53,9 @@ public final class FlumeAppender<T exten
 
     private final FlumeEventFactory factory;
 
+    /**
+     * Which Manager will be used by the appender instance.
+     */
     private enum ManagerType {
         AVRO, EMBEDDED, PERSISTENT;
 
@@ -79,7 +85,14 @@ public final class FlumeAppender<T exten
      */
     @Override
     public void append(final LogEvent event) {
-
+        String name = event.getLoggerName();
+        if (name != null) {
+            for (String pkg : EXCLUDED_PACKAGES) {
+                if (name.startsWith(pkg)) {
+                    return;
+                }
+            }
+        }
         final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes,
mdcRequired, mdcPrefix,
             eventPrefix, compressBody);
         flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
@@ -136,6 +149,8 @@ public final class FlumeAppender<T exten
      * @param factory The factory to use to create Flume events.
      * @param layout The layout to format the event.
      * @param filter A Filter to filter events.
+     * @param <S> The {@link Layout}'s {@link Serializable} type.
+     *
      * @return A Flume Avro Appender.
      */
     @PluginFactory
@@ -172,7 +187,8 @@ public final class FlumeAppender<T exten
                     managerType = ManagerType.getType(type);
                     LOGGER.warn("Embedded and type attributes are mutually exclusive. Using
type " + type);
                 } catch (Exception ex) {
-                    LOGGER.warn("Embedded and type attributes are mutually exclusive and
type " + type + " is invalid.");
+                    LOGGER.warn("Embedded and type attributes are mutually exclusive and
type " + type +
+                        " is invalid.");
                     managerType = ManagerType.EMBEDDED;
                 }
             } else {
@@ -196,8 +212,8 @@ public final class FlumeAppender<T exten
         final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
 
         if (layout == null) {
-            @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
-            Layout<S> l = (Layout<S>)RFC5424Layout.createLayout(null, null, null,
"True", null, mdcPrefix, eventPrefix,
+            @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable" })
+            Layout<S> l = (Layout<S>) RFC5424Layout.createLayout(null, null,
null, "True", null, mdcPrefix, eventPrefix,
                     null, null, null, excludes, includes, required, null, null, null, null);
             layout = l;
         }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java?rev=1499780&r1=1499779&r2=1499780&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
Thu Jul  4 15:12:43 2013
@@ -16,6 +16,14 @@
  */
 package org.apache.logging.log4j.flume.appender;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPOutputStream;
+
 import org.apache.flume.event.SimpleEvent;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LoggingException;
@@ -28,19 +36,12 @@ import org.apache.logging.log4j.message.
 import org.apache.logging.log4j.message.StructuredDataId;
 import org.apache.logging.log4j.message.StructuredDataMessage;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.zip.GZIPOutputStream;
-
 /**
  * Class that is both a Flume and Log4j Event.
  */
 public class FlumeEvent extends SimpleEvent implements LogEvent {
 
+    static final String GUID = "guId";
     /**
      * Generated serial version ID.
      */
@@ -54,8 +55,6 @@ public class FlumeEvent extends SimpleEv
 
     private static final String EVENT_ID = "eventId";
 
-    static final String GUID = "guId";
-
     private static final String TIMESTAMP = "timeStamp";;
 
     private final LogEvent event;

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1499780&r1=1499779&r2=1499780&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
Thu Jul  4 15:12:43 2013
@@ -16,6 +16,35 @@
  */
 package org.apache.logging.log4j.flume.appender;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.logging.log4j.LoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.PluginManager;
+import org.apache.logging.log4j.core.config.plugins.PluginType;
+import org.apache.logging.log4j.core.helpers.FileUtils;
+import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
+
 import com.sleepycat.je.Cursor;
 import com.sleepycat.je.CursorConfig;
 import com.sleepycat.je.Database;
@@ -27,34 +56,13 @@ import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.StatsConfig;
 import com.sleepycat.je.Transaction;
-import org.apache.flume.Event;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.logging.log4j.LoggingException;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.config.plugins.PluginManager;
-import org.apache.logging.log4j.core.config.plugins.PluginType;
-import org.apache.logging.log4j.core.helpers.FileUtils;
-import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
-
-import javax.crypto.Cipher;
-import javax.crypto.SecretKey;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 /**
- *
+ * Manager that persists data to Berkeley DB before passing it on to Flume.
  */
 public class FlumePersistentManager extends FlumeAvroManager {
 
+    /** Attribute name for the key provider. */
     public static final String KEY_PROVIDER = "keyProvider";
 
     private static final Charset UTF8 = Charset.forName("UTF-8");
@@ -63,6 +71,8 @@ public class FlumePersistentManager exte
 
     private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
 
+    private static final int SHUTDOWN_WAIT = 60;
+
     private static BDBManagerFactory factory = new BDBManagerFactory();
 
     private Database database;
@@ -77,12 +87,21 @@ public class FlumePersistentManager exte
 
     private final int delay;
 
+    private final ExecutorService threadPool;
+
     /**
      * Constructor
      * @param name The unique name of this manager.
+     * @param shortName Original name for the Manager.
      * @param agents An array of Agents.
      * @param batchSize The number of events to include in a batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectionTimeout The amount of time to wait for a connection to be established.
+     * @param requestTimeout The amount of time to wair for a response to a request.
+     * @param delay The amount of time to wait between retries.
      * @param database The database to write to.
+     * @param environment The database environment.
+     * @param secretKey The SecretKey to use for encryption.
      */
     protected FlumePersistentManager(final String name, final String shortName, final Agent[]
agents,
                                      final int batchSize, final int retries, final int connectionTimeout,
@@ -95,6 +114,7 @@ public class FlumePersistentManager exte
         this.worker = new WriterThread(database, environment, this, queue, batchSize, secretKey);
         this.worker.start();
         this.secretKey = secretKey;
+        this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
     }
 
 
@@ -102,7 +122,13 @@ public class FlumePersistentManager exte
      * Returns a FlumeAvroManager.
      * @param name The name of the manager.
      * @param agents The agents to use.
+     * @param properties Properties to pass to the Manager.
      * @param batchSize The number of events to include in a batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectionTimeout The amount of time to wait to establish a connection.
+     * @param requestTimeout The amount of time to wait for a response to a request.
+     * @param delay Amount of time to delay before delivering a batch.
+     * @param dataDir The location of the Berkeley database.
      * @return A FlumeAvroManager.
      */
     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
Property[] properties,
@@ -156,19 +182,18 @@ public class FlumePersistentManager exte
                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
                 eventData = cipher.doFinal(eventData);
             }
-            final DatabaseEntry key = new DatabaseEntry(keyData);
-            final DatabaseEntry data = new DatabaseEntry(eventData);
-            Transaction txn = environment.beginTransaction(null, null);
-            try {
-                database.put(txn, key, data);
-                txn.commit();
-                queue.add(keyData);
-            } catch (Exception ex) {
-                if (txn != null) {
-                    txn.abort();
+            Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData,
environment, database, queue));
+            boolean interrupted = false;
+            int count = 0;
+            do {
+                try {
+                    future.get();
+                } catch (InterruptedException ie) {
+                    interrupted = true;
+                    ++count;
                 }
-                throw ex;
-            }
+            } while (interrupted && count <= 1);
+
         } catch (Exception ex) {
             throw new LoggingException("Exception occurred writing log event", ex);
         }
@@ -178,6 +203,12 @@ public class FlumePersistentManager exte
     protected void releaseSub() {
         LOGGER.debug("Shutting down FlumePersistentManager");
         worker.shutdown();
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            LOGGER.warn("PersistentManager Thread pool failed to shut down");
+        }
         try {
             worker.join();
         } catch (InterruptedException ex) {
@@ -204,6 +235,44 @@ public class FlumePersistentManager exte
     }
 
     /**
+     * Thread for writing to Berkeley DB to avoid having interrupts close the database.
+     */
+    private static class BDBWriter implements Callable<Integer> {
+        private final byte[] eventData;
+        private final byte[] keyData;
+        private final Environment environment;
+        private final Database database;
+        private final LinkedBlockingQueue<byte[]> queue;
+
+        public BDBWriter(byte[] keyData, byte[] eventData, Environment environment, Database
database,
+                         LinkedBlockingQueue<byte[]> queue) {
+            this.keyData = keyData;
+            this.eventData = eventData;
+            this.environment = environment;
+            this.database = database;
+            this.queue = queue;
+        }
+
+        @Override
+        public  Integer call() throws Exception {
+            final DatabaseEntry key = new DatabaseEntry(keyData);
+            final DatabaseEntry data = new DatabaseEntry(eventData);
+            Transaction txn = environment.beginTransaction(null, null);
+            try {
+                database.put(txn, key, data);
+                txn.commit();
+                queue.add(keyData);
+            } catch (Exception ex) {
+                if (txn != null) {
+                    txn.abort();
+                }
+                throw ex;
+            }
+            return eventData.length;
+        }
+    }
+
+    /**
      * Factory data.
      */
     private static class FactoryData {
@@ -326,6 +395,9 @@ public class FlumePersistentManager exte
         }
     }
 
+    /**
+     * Thread that sends data to Flume and pulls it from Berkeley DB.
+     */
     private static class WriterThread extends Thread  {
         private volatile boolean shutdown = false;
         private final Database database;
@@ -523,4 +595,31 @@ public class FlumePersistentManager exte
         }
 
     }
+
+    /**
+     * Factory that creates Daemon threads that can be properly shut down.
+     */
+    private static class DaemonThreadFactory implements ThreadFactory {
+        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
+        private final ThreadGroup group;
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final String namePrefix;
+
+        public DaemonThreadFactory() {
+            SecurityManager securityManager = System.getSecurityManager();
+            group = (securityManager != null) ? securityManager.getThreadGroup() :
+                Thread.currentThread().getThreadGroup();
+            namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),
0);
+            thread.setDaemon(true);
+            if (thread.getPriority() != Thread.NORM_PRIORITY) {
+                thread.setPriority(Thread.NORM_PRIORITY);
+            }
+            return thread;
+        }
+
+    }
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java?rev=1499780&r1=1499779&r2=1499780&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
Thu Jul  4 15:12:43 2013
@@ -21,8 +21,8 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.AbstractSource;
-import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *



Mime
View raw message