logging-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgo...@apache.org
Subject svn commit: r1457624 - in /logging/log4j/log4j2/trunk: ./ core/src/main/java/org/apache/logging/log4j/core/helpers/ flume-ng/ flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/ flume-ng/src/site/xdoc/ flume-ng/src/test/java/org/apache/logg...
Date Mon, 18 Mar 2013 03:15:43 GMT
Author: rgoers
Date: Mon Mar 18 03:15:41 2013
New Revision: 1457624

URL: http://svn.apache.org/r1457624
Log:
Add Persistent Flume Appender

Added:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
      - copied, changed from r1456570, logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml
      - copied, changed from r1456570, logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
Modified:
    logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/helpers/FileUtils.java
    logging/log4j/log4j2/trunk/flume-ng/pom.xml
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
    logging/log4j/log4j2/trunk/flume-ng/src/site/xdoc/index.xml
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
    logging/log4j/log4j2/trunk/pom.xml
    logging/log4j/log4j2/trunk/src/changes/changes.xml
    logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml

Modified: logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/helpers/FileUtils.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/helpers/FileUtils.java?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/helpers/FileUtils.java (original)
+++ logging/log4j/log4j2/trunk/core/src/main/java/org/apache/logging/log4j/core/helpers/FileUtils.java Mon Mar 18 03:15:41 2013
@@ -20,6 +20,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.status.StatusLogger;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -75,4 +76,25 @@ public final class FileUtils {
     public static boolean isFile(final URL url) {
         return url != null && (url.getProtocol().equals(PROTOCOL_FILE) || url.getProtocol().equals(JBOSS_FILE));
     }
+
+    /**
+     * Asserts that the given directory exists and creates it if necessary.
+     * @param dir the directory that shall exist
+     * @param createDirectoryIfNotExisting specifies if the directory shall be created if it does not exist.
+     * @throws java.io.IOException thrown if the directory could not be created.
+     */
+    public static void mkdir(final File dir, final boolean createDirectoryIfNotExisting ) throws IOException {
+        // commons io FileUtils.forceMkdir would be useful here, we just want to omit this dependency
+        if (!dir.exists()) {
+            if(!createDirectoryIfNotExisting) {
+                throw new IOException( "The directory " + dir.getAbsolutePath() + " does not exist." );
+            }
+            if(!dir.mkdirs()) {
+                throw new IOException( "Could not create directory " + dir.getAbsolutePath() );
+            }
+        }
+        if (!dir.isDirectory()) {
+            throw new IOException("File " + dir + " exists and is not a directory. Unable to create directory.");
+        }
+    }
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/pom.xml?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/pom.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/pom.xml Mon Mar 18 03:15:41 2013
@@ -44,6 +44,10 @@
       <artifactId>log4j-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.sleepycat</groupId>
+      <artifactId>je</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j.adapters</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>
@@ -270,5 +274,35 @@
       </plugin>
     </plugins>
   </reporting>
+  <profiles>
+    <profile>
+      <!-- http://www.yourkit.com/docs/80/help/agent.jsp -->
+      <id>yourkit</id>
+
+      <properties>
+        <yourkit.home>/Applications/YourKit_Java_Profiler_11.0.5.app</yourkit.home>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>com.yourkit</groupId>
+          <artifactId>yjp-controller-api-redist</artifactId>
+          <version>11.0.5</version>
+          <scope>system</scope>
+          <systemPath>${yourkit.home}/lib/yjp-controller-api-redist.jar</systemPath>
+        </dependency>
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <argLine>-agentpath:"${yourkit.home}/bin/mac/libyjpagent.jnilib"</argLine>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>
 

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java Mon Mar 18 03:15:41 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.logging.log4j.flume.appender;
 
+import org.apache.flume.event.SimpleEvent;
 import org.apache.logging.log4j.core.appender.AbstractManager;
 
 /**
@@ -27,5 +28,5 @@ public abstract class AbstractFlumeManag
         super(name);
     }
 
-    public abstract void send(FlumeEvent event, int delay, int retries);
+    public abstract void send(SimpleEvent event, int delay, int retries);
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java Mon Mar 18 03:15:41 2013
@@ -27,6 +27,8 @@ import org.apache.logging.log4j.core.con
 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
 import org.apache.logging.log4j.core.layout.RFC5424Layout;
 
+import java.util.Locale;
+
 /**
  * An Appender that uses the Avro protocol to route events to Flume.
  */
@@ -51,6 +53,14 @@ public final class FlumeAppender extends
 
     private final FlumeEventFactory factory;
 
+    private enum ManagerType {
+        AVRO, EMBEDDED, PERSISTENT;
+
+        public static ManagerType getType(String type) {
+            return valueOf(type.toUpperCase(Locale.US));
+        }
+    }
+
     private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException,
                           final String includes, final String excludes, final String required, final String mdcPrefix,
                           final String eventPrefix, final boolean compress, final int delay, final int retries,
@@ -109,6 +119,8 @@ public final class FlumeAppender extends
      * @param agents An array of Agents.
      * @param properties Properties to pass to the embedded agent.
      * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
+     * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
+     * @param type Avro (default), Embedded, or Persistent.
      * @param dataDir The directory where the Flume FileChannel should write its data.
      * @param delay The amount of time in milliseconds to wait between retries.
      * @param agentRetries The number of times to retry an agent before failing to the next agent.
@@ -130,6 +142,7 @@ public final class FlumeAppender extends
     public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
                                                    @PluginElement("properties") final Property[] properties,
                                                    @PluginAttr("embedded") final String embedded,
+                                                   @PluginAttr("type") final String type,
                                                    @PluginAttr("dataDir") final String dataDir,
                                                    @PluginAttr("reconnectionDelay") final String delay,
                                                    @PluginAttr("agentRetries") final String agentRetries,
@@ -150,6 +163,29 @@ public final class FlumeAppender extends
             (agents == null || agents.length == 0) && properties != null && properties.length > 0;
         final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
         final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
+        ManagerType managerType;
+        if (type != null) {
+            if (embed) {
+                try {
+                    managerType = ManagerType.getType(type);
+                    LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
+                } catch (Exception ex) {
+                    LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid.");
+                    managerType = ManagerType.EMBEDDED;
+                }
+            } else {
+                try {
+                    managerType = ManagerType.getType(type);
+                } catch (Exception ex) {
+                    LOGGER.warn("Type " + type + " is invalid.");
+                    managerType = ManagerType.EMBEDDED;
+                }
+            }
+        }  else if (embed) {
+           managerType = ManagerType.EMBEDDED;
+        }  else {
+           managerType = ManagerType.AVRO;
+        }
 
         final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
         final int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
@@ -167,14 +203,31 @@ public final class FlumeAppender extends
 
         AbstractFlumeManager manager;
 
-        if (embed) {
-            manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
-        } else {
-            if (agents == null || agents.length == 0) {
-                LOGGER.debug("No agents provided, using defaults");
-                agents = new Agent[] {Agent.createAgent(null, null)};
-            }
-            manager = FlumeAvroManager.getManager(name, agents, batchCount);
+        switch (managerType) {
+            case EMBEDDED:
+                manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
+                break;
+            case AVRO:
+                if (agents == null || agents.length == 0) {
+                    LOGGER.debug("No agents provided, using defaults");
+                    agents = new Agent[] {Agent.createAgent(null, null)};
+                }
+                manager = FlumeAvroManager.getManager(name, agents, batchCount);
+                break;
+            case PERSISTENT:
+                if (agents == null || agents.length == 0) {
+                    LOGGER.debug("No agents provided, using defaults");
+                    agents = new Agent[] {Agent.createAgent(null, null)};
+                }
+                manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, reconnectDelay, dataDir);
+                break;
+            default:
+                LOGGER.debug("No manager type specified. Defaulting to AVRO");
+                if (agents == null || agents.length == 0) {
+                    LOGGER.debug("No agents provided, using defaults");
+                    agents = new Agent[] {Agent.createAgent(null, null)};
+                }
+                manager = FlumeAvroManager.getManager(name, agents, batchCount);
         }
 
         if (manager == null) {

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java Mon Mar 18 03:15:41 2013
@@ -20,6 +20,7 @@ import org.apache.avro.AvroRemoteExcepti
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
@@ -119,7 +120,7 @@ public class FlumeAvroManager extends Ab
     }
 
     @Override
-    public synchronized void send(final FlumeEvent event, int delay, int retries)  {
+    public synchronized void send(final SimpleEvent event, int delay, int retries)  {
         if (delay == 0) {
             delay = DEFAULT_RECONNECTION_DELAY;
         }
@@ -158,7 +159,7 @@ public class FlumeAvroManager extends Ab
                     return;
                 } catch (final Exception ex) {
                     if (i == retries - 1) {
-                        msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
+                        msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
                             agents[current].getPort();
                         LOGGER.warn(msg, ex);
                         break;
@@ -191,7 +192,7 @@ public class FlumeAvroManager extends Ab
                         return;
                     } catch (final Exception ex) {
                         if (i == retries - 1) {
-                            final String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
+                            final String warnMsg = "Unable to write to " + getName() + " at " + agent.getHost() + ":" +
                                 agent.getPort();
                             LOGGER.warn(warnMsg, ex);
                             break;

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java Mon Mar 18 03:15:41 2013
@@ -17,6 +17,7 @@
 package org.apache.logging.log4j.flume.appender;
 
 import org.apache.flume.SourceRunner;
+import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.node.NodeConfiguration;
 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
 import org.apache.logging.log4j.core.appender.ManagerFactory;
@@ -118,7 +119,7 @@ public class FlumeEmbeddedManager extend
     }
 
     @Override
-    public void send(final FlumeEvent event, final int delay, final int retries) {
+    public void send(final SimpleEvent event, final int delay, final int retries) {
         source.send(event);
     }
 

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java Mon Mar 18 03:15:41 2013
@@ -54,7 +54,7 @@ public class FlumeEvent extends SimpleEv
 
     private static final String EVENT_ID = "eventId";
 
-    private static final String GUID = "guId";
+    static final String GUID = "guId";
 
     private static final String TIMESTAMP = "timeStamp";;
 

Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1457624&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java Mon Mar 18 03:15:41 2013
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.StatsConfig;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.logging.log4j.LoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.helpers.FileUtils;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.PBEKeySpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.nio.charset.Charset;
+import java.security.SecureRandom;
+import java.security.spec.KeySpec;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ *
+ */
+public class FlumePersistentManager extends FlumeAvroManager {
+
+    public static final String PASSWORD = "password";
+
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+
+    private static final String SHUTDOWN = "Shutdown";
+
+    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
+
+    private static ManagerFactory factory = new  BDBManagerFactory();
+
+    private Database database;
+
+    private final WriterThread worker;
+
+    private final int reconnectionDelay;
+
+    private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
+
+    private final SecretKey secretKey;
+
+    /**
+     The default reconnection delay (5 minutes).
+     */
+    public static final int DEFAULT_DELAY = 1000 * 60 * 5;
+
+    /**
+     * Constructor
+     * @param name The unique name of this manager.
+     * @param agents An array of Agents.
+     * @param batchSize The number of events to include in a batch.
+     * @param database The database to write to.
+     */
+    protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
+                                     final int batchSize, final int reconnectionDelay, final Database database,
+                                     SecretKey secretKey) {
+        super(name, shortName, agents, batchSize);
+        this.database = database;
+        this.worker = new WriterThread(database, this, queue, secretKey);
+        this.worker.start();
+        this.reconnectionDelay = reconnectionDelay <= 0 ? DEFAULT_DELAY : reconnectionDelay;
+        this.secretKey = secretKey;
+    }
+
+
+    /**
+     * Returns a FlumeAvroManager.
+     * @param name The name of the manager.
+     * @param agents The agents to use.
+     * @param batchSize The number of events to include in a batch.
+     * @return A FlumeAvroManager.
+     */
+    public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties,
+                                                    int batchSize, final int reconnectionDelay, final String dataDir) {
+        if (agents == null || agents.length == 0) {
+            throw new IllegalArgumentException("At least one agent is required");
+        }
+
+        if (batchSize <= 0) {
+            batchSize = 1;
+        }
+        String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir;
+
+        final StringBuilder sb = new StringBuilder("FlumeKrati[");
+        boolean first = true;
+        for (final Agent agent : agents) {
+            if (!first) {
+                sb.append(",");
+            }
+            sb.append(agent.getHost()).append(":").append(agent.getPort());
+            first = false;
+        }
+        sb.append("]");
+        sb.append(" ").append(dataDirectory);
+        return (FlumePersistentManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize,
+            reconnectionDelay, dataDir, properties));
+    }
+
+    @Override
+    public synchronized void send(final SimpleEvent event, int delay, int retries)  {
+        if (worker.isShutdown()) {
+            throw new LoggingException("Unable to record event");
+        }
+
+        Map<String, String> headers = event.getHeaders();
+        byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream daos = new DataOutputStream(baos);
+            daos.writeInt(event.getBody().length);
+            daos.write(event.getBody(), 0, event.getBody().length);
+            daos.writeInt(event.getHeaders().size());
+            for (Map.Entry<String, String> entry : headers.entrySet()) {
+                daos.writeUTF(entry.getKey());
+                daos.writeUTF(entry.getValue());
+            }
+            byte[] eventData = baos.toByteArray();
+            if (secretKey != null) {
+                Cipher cipher = Cipher.getInstance("AES");
+                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
+                eventData = cipher.doFinal(eventData);
+            }
+            final DatabaseEntry key = new DatabaseEntry(keyData);
+            final DatabaseEntry data = new DatabaseEntry(eventData);
+            database.put(null, key, data);
+            queue.add(keyData);
+        } catch (Exception ex) {
+            throw new LoggingException("Exception occurred writing log event", ex);
+        }
+    }
+
+    @Override
+    protected void releaseSub() {
+        worker.shutdown();
+        try {
+            worker.join();
+        } catch (InterruptedException ex) {
+            LOGGER.debug("Interrupted while waiting for worker to complete");
+        }
+        try {
+            LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
+            database.close();
+        } catch (final Exception ex) {
+            LOGGER.warn("Failed to close database", ex);
+        }
+        super.releaseSub();
+    }
+
+    private void doSend(final SimpleEvent event) {
+        LOGGER.debug("Sending event to Flume");
+        super.send(event, 1, 1);
+    }
+
+    /**
+     * Factory data.
+     */
+    private static class FactoryData {
+        private final String name;
+        private final Agent[] agents;
+        private final int batchSize;
+        private final String dataDir;
+        private final int reconnectionDelay;
+        private final Property[] properties;
+
+        /**
+         * Constructor.
+         * @param name The name of the Appender.
+         * @param agents The agents.
+         * @param batchSize The number of events to include in a batch.
+         * @param dataDir The directory for data.
+         */
+        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int reconnectionDelay,
+                           final String dataDir, final Property[] properties) {
+            this.name = name;
+            this.agents = agents;
+            this.batchSize = batchSize;
+            this.dataDir = dataDir;
+            this.reconnectionDelay = reconnectionDelay;
+            this.properties = properties;
+        }
+    }
+
+    /**
+     * Avro Manager Factory.
+     */
+    private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
+
+        /**
+         * Create the FlumeKratiManager.
+         * @param name The name of the entity to manage.
+         * @param data The data required to create the entity.
+         * @return The FlumeKratiManager.
+         */
+        public FlumePersistentManager createManager(final String name, final FactoryData data) {
+            SecretKey secretKey = null;
+            byte[] salt;
+
+            Database database;
+
+            Map<String, String> properties = new HashMap<String, String>();
+            if (data.properties != null) {
+                for (Property property : data.properties) {
+                    properties.put(property.getName(), property.getValue());
+                }
+            }
+
+            try {
+
+                File dir = new File(data.dataDir);
+                FileUtils.mkdir(dir, true);
+                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
+                dbEnvConfig.setTransactional(false);
+                dbEnvConfig.setAllowCreate(true);
+                final Environment environment = new Environment(dir, dbEnvConfig);
+                final DatabaseConfig dbConfig = new DatabaseConfig();
+                dbConfig.setTransactional(false);
+                dbConfig.setAllowCreate(true);
+                database = environment.openDatabase(null, name, dbConfig);
+            } catch (final Exception ex) {
+                LOGGER.error("Could not create FlumePersistentManager", ex);
+                return null;
+            }
+
+            try {
+                if (properties.containsKey(PASSWORD)) {
+                    String password = properties.get(PASSWORD);
+                    salt = new byte[20];
+                    File saltFile = new File(data.dataDir + "/salt.dat");
+                    boolean needSalt = true;
+                    if (saltFile.exists()) {
+                        FileInputStream fis = new FileInputStream(saltFile);
+                        if (fis.read(salt) == 20) {
+                            needSalt = false;
+                        }
+                        fis.close();
+                    }
+                    if (needSalt) {
+                        Random r = new SecureRandom();
+                        r.nextBytes(salt);
+                        FileOutputStream fos = new FileOutputStream(saltFile);
+                        fos.write(salt);
+                        fos.close();
+                    }
+                    SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1");
+                    KeySpec spec = new PBEKeySpec(password.toCharArray(), salt, 65536, 256);
+                    SecretKey tmp = factory.generateSecret(spec);
+                    secretKey = new SecretKeySpec(tmp.getEncoded(), "AES");
+                }
+                return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.reconnectionDelay,
+                    database, secretKey);
+            } catch (Exception ex) {
+                LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
+
+            }
+            return null;
+        }
+    }
+
+    private static class WriterThread extends Thread  {
+        private volatile boolean shutdown = false;
+        private final Database database;
+        private final FlumePersistentManager manager;
+        private final LinkedBlockingQueue<byte[]> queue;
+        private final SecretKey secretKey;
+
+        public WriterThread(Database database, FlumePersistentManager manager, LinkedBlockingQueue<byte[]> queue,
+                            SecretKey secretKey) {
+            this.database = database;
+            this.manager = manager;
+            this.queue = queue;
+            this.secretKey = secretKey;
+        }
+
+        public void shutdown() {
+            this.shutdown = true;
+            if (queue.size() == 0) {
+                queue.add(SHUTDOWN.getBytes(UTF8));
+            }
+        }
+
+        public boolean isShutdown() {
+            return shutdown;
+        }
+
+        @Override
+        public void run() {
+            LOGGER.trace("WriterThread started");
+            while (!shutdown) {
+                try {
+                    boolean errors = false;
+                    final DatabaseEntry key = new DatabaseEntry();
+                    final DatabaseEntry data = new DatabaseEntry();
+                    final Cursor cursor = database.openCursor(null, null);
+                    try {
+                        queue.clear();
+                        OperationStatus status;
+                        try {
+                            status = cursor.getFirst(key, data, LockMode.RMW);
+
+                            while (status == OperationStatus.SUCCESS) {
+                                SimpleEvent event = new SimpleEvent();
+                                try {
+                                    byte[] eventData = data.getData();
+                                    if (secretKey != null) {
+                                        Cipher cipher = Cipher.getInstance("AES");
+                                        cipher.init(Cipher.DECRYPT_MODE, secretKey);
+                                        eventData = cipher.doFinal(eventData);
+                                    }
+                                    ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
+                                    DataInputStream dais = new DataInputStream(bais);
+                                    int length = dais.readInt();
+                                    byte[] bytes = new byte[length];
+                                    dais.read(bytes, 0, length);
+                                    event.setBody(bytes);
+                                    length = dais.readInt();
+                                    Map<String, String> map = new HashMap<String, String>(length);
+                                    for (int i = 0; i < length; ++i) {
+                                        String headerKey = dais.readUTF();
+                                        String value = dais.readUTF();
+                                        map.put(headerKey, value);
+                                    }
+                                    event.setHeaders(map);
+                                } catch (Exception ex) {
+                                    errors = true;
+                                    LOGGER.error("Error retrieving event", ex);
+                                    continue;
+                                }
+                                try {
+                                    manager.doSend(event);
+                                } catch (Exception ioe) {
+                                    errors = true;
+                                    LOGGER.error("Error sending event", ioe);
+                                    break;
+                                }
+                                if (!errors) {
+                                    try {
+                                        cursor.delete();
+                                    } catch (Exception ex) {
+                                        LOGGER.error("Unable to delete event", ex);
+                                    }
+                                }
+                                status = cursor.getNext(key, data, LockMode.RMW);
+                            }
+                        } catch (Exception ex) {
+                            LOGGER.error("Error reading database", ex);
+                            shutdown = true;
+                            break;
+                        }
+
+                    } finally {
+                        cursor.close();
+                    }
+                    if (errors) {
+                        Thread.sleep(manager.reconnectionDelay);
+                        continue;
+                    }
+                } catch (Exception ex) {
+                    LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
+                }
+                try {
+                    if (database.count() > 0) {
+                        continue;
+                    }
+                    queue.take();
+                    LOGGER.debug("WriterThread notified of work");
+                } catch (InterruptedException ie) {
+                    LOGGER.warn("WriterThread interrupted, continuing");
+                } catch (Exception ex) {
+                    LOGGER.error("WriterThread encountered an exception waiting for work", ex);
+                    break;
+                }
+            }
+            LOGGER.trace("WriterThread exiting");
+        }
+
+    }
+}

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java Mon Mar 18 03:15:41 2013
@@ -18,6 +18,7 @@ package org.apache.logging.log4j.flume.a
 
 import org.apache.flume.ChannelException;
 import org.apache.flume.EventDrivenSource;
+import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.AbstractSource;
 import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class Log4jEventSource extends Ab
     }
 
 
-    public void send(final FlumeEvent event) {
+    public void send(final SimpleEvent event) {
         sourceCounter.incrementAppendReceivedCount();
         sourceCounter.incrementEventReceivedCount();
         try {

Modified: logging/log4j/log4j2/trunk/flume-ng/src/site/xdoc/index.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/site/xdoc/index.xml?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/site/xdoc/index.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/site/xdoc/index.xml Mon Mar 18 03:15:41 2013
@@ -34,6 +34,36 @@
               transitive dependencies necessary for the remote Agent to function to be included.
             </p>
           </subsection>
+          <subsection name="Persistent Agent">
+            <p>
+              The persistent agent uses Berkeley DB. The following dependency should be added to your pom.xml
+              in addition to the log4j-flume-ng dependency.
+            </p>
+            <source><![CDATA[
+            <dependencyManagement>
+              <dependencies>
+                <dependency>
+                  <groupId>com.sleepycat</groupId>
+                  <artifactId>je</artifactId>
+                  <version>5.0.73</version>
+                  </dependency>
+              </dependencies>
+            </dependencyManagement>
+            <dependencies>
+              <dependency>
+                <groupId>com.sleepycat</groupId>
+                <artifactId>je</artifactId>
+              </dependency>
+            </dependencies>
+            <repositories>
+              <repository>
+                <id>oracleReleases</id>
+                <name>Oracle Released Java Packages</name>
+                <url>http://download.oracle.com/maven</url>
+                <layout>default</layout>
+              </repository>
+            </repositories>]]></source>
+          </subsection>
 
           <subsection name="Embedded Agent">
             <p>

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Mon Mar 18 03:15:41 2013
@@ -129,8 +129,8 @@ public class FlumeAppenderTest {
     @Test
     public void testLog4jAvroAppender() throws InterruptedException, IOException {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
-            "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
+            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -155,8 +155,8 @@ public class FlumeAppenderTest {
     @Test
     public void testStructured() throws InterruptedException, IOException {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
-            "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
+            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         final Logger eventLogger = (Logger) LogManager.getLogger("EventLogger");
         Assert.assertNotNull(eventLogger);
@@ -193,8 +193,8 @@ public class FlumeAppenderTest {
     @Test
     public void testMultiple() throws InterruptedException, IOException {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
-            "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
+            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -223,8 +223,8 @@ public class FlumeAppenderTest {
      @Test
     public void testBatch() throws InterruptedException, IOException {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
-            "false", null, null, null, null, null, "true", "10", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
+            "avro", "false", null, null, null, null, null, "true", "10", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -254,8 +254,8 @@ public class FlumeAppenderTest {
     @Test
     public void testConnectionRefused() {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
-            "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
+            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -280,8 +280,8 @@ public class FlumeAppenderTest {
         final String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
             Agent.createAgent("localhost", altPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
-            "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
+            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         Assert.assertTrue("Appender Not started", avroAppender.isStarted());
         avroLogger.addAppender(avroAppender);
@@ -325,8 +325,8 @@ public class FlumeAppenderTest {
         final String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
                                       Agent.createAgent("localhost", altPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
-            "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, null, "Avro", null, "100", "3",
+            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java Mon Mar 18 03:15:41 2013
@@ -193,6 +193,19 @@ public class FlumeEmbeddedAppenderTest {
         }
     }
 
+    @Test
+    public void testPerformance() throws Exception {
+        long start = System.currentTimeMillis();
+        int count = 10000;
+        for (int i = 0; i < count; ++i) {
+            final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
+            msg.put("counter", Integer.toString(i));
+            EventLogger.logEvent(msg);
+        }
+        long elapsed = System.currentTimeMillis() - start;
+        System.out.println("Time to log " + count + " events " + elapsed + "ms");
+    }
+
 
     private String getBody(final Event event) throws IOException {
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();

Copied: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java (from r1456570, logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java)
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java?p2=logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java&p1=logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java&r1=1456570&r2=1457624&rev=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java Mon Mar 18 03:15:41 2013
@@ -50,10 +50,10 @@ import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -62,8 +62,8 @@ import java.util.zip.GZIPInputStream;
 /**
  *
  */
-public class FlumeEmbeddedAgentTest {
-    private static final String CONFIG = "default_embedded.xml";
+public class FlumePersistentAppenderTest {
+    private static final String CONFIG = "persistent.xml";
     private static final String HOSTNAME = "localhost";
     private static LoggerContext ctx;
 
@@ -87,7 +87,7 @@ public class FlumeEmbeddedAgentTest {
     @Before
     public void setUp() throws Exception {
 
-        final File file = new File("target/file-channel");
+        final File file = new File("target/persistent");
         final boolean result = deleteFiles(file);
 
         /*
@@ -141,15 +141,24 @@ public class FlumeEmbeddedAgentTest {
 
         for (int i = 0; i < 10; ++i) {
             final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test");
+            msg.put("counter", Integer.toString(i));
             EventLogger.logEvent(msg);
         }
+        boolean[] fields = new boolean[10];
         for (int i = 0; i < 10; ++i) {
             final Event event = primary.poll();
-            Assert.assertNotNull(event);
-            final String body = getBody(event);
-            final String expected = "Test Multiple " + i;
-            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
-                body.endsWith(expected));
+            Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event);
+            final String value = event.getHeaders().get("counter");
+            Assert.assertNotNull("Missing counter", value);
+            final int counter = Integer.parseInt(value);
+            if (fields[counter]) {
+                Assert.fail("Duplicate event");
+            } else {
+                fields[counter] = true;
+            }
+        }
+        for (int i = 0; i < 10; ++i) {
+            Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]);
         }
     }
 
@@ -160,15 +169,24 @@ public class FlumeEmbeddedAgentTest {
         logger.debug("Starting testFailover");
         for (int i = 0; i < 10; ++i) {
             final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
+            msg.put("counter", Integer.toString(i));
             EventLogger.logEvent(msg);
         }
+        boolean[] fields = new boolean[10];
         for (int i = 0; i < 10; ++i) {
             final Event event = primary.poll();
-            Assert.assertNotNull(event);
-            final String body = getBody(event);
-            final String expected = "Test Primary " + i;
-            Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
-                body.endsWith(expected));
+            Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event);
+            final String value = event.getHeaders().get("counter");
+            Assert.assertNotNull("Missing counter", value);
+            final int counter = Integer.parseInt(value);
+            if (fields[counter]) {
+                Assert.fail("Duplicate event");
+            } else {
+                fields[counter] = true;
+            }
+        }
+        for (int i = 0; i < 10; ++i) {
+            Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]);
         }
 
         // Give the AvroSink time to receive notification and notify the channel.
@@ -176,21 +194,40 @@ public class FlumeEmbeddedAgentTest {
 
         primary.stop();
 
-
         for (int i = 0; i < 10; ++i) {
             final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test");
+            msg.put("cntr", Integer.toString(i));
             EventLogger.logEvent(msg);
         }
+        fields = new boolean[10];
         for (int i = 0; i < 10; ++i) {
             final Event event = alternate.poll();
-            Assert.assertNotNull(event);
-            final String body = getBody(event);
-            final String expected = "Test Alternate " + i;
-            /* When running in Gump Flume consistently returns the last event from the primary channel after
-               the failover, which fails this test */
-            Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
-                " Received: " + body, body.endsWith(expected));
+            Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event);
+            final String value = event.getHeaders().get("cntr");
+            Assert.assertNotNull("Missing counter", value);
+            final int counter = Integer.parseInt(value);
+            if (fields[counter]) {
+                Assert.fail("Duplicate event");
+            } else {
+                fields[counter] = true;
+            }
+        }
+        for (int i = 0; i < 10; ++i) {
+            Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]);
+        }
+    }
+
+    @Test
+    public void testPerformance() throws Exception {
+        long start = System.currentTimeMillis();
+        int count = 1000;
+        for (int i = 0; i < count; ++i) {
+            final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
+            msg.put("counter", Integer.toString(i));
+            EventLogger.logEvent(msg);
         }
+        long elapsed = System.currentTimeMillis() - start;
+        System.out.println("Time to log " + count + " events " + elapsed + "ms");
     }
 
 

Added: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java?rev=1457624&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java (added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java Mon Mar 18 03:15:41 2013
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+import org.apache.logging.log4j.EventLogger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.XMLConfigurationFactory;
+import org.apache.logging.log4j.message.StructuredDataMessage;
+import org.apache.logging.log4j.status.StatusLogger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ */
+public class FlumePersistentPerf {
+    private static final String CONFIG = "persistent.xml";
+    private static final String HOSTNAME = "localhost";
+    private static LoggerContext ctx;
+
+    private EventCollector primary;
+    private EventCollector alternate;
+
+    @BeforeClass
+    public static void setupClass() {
+        // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
+        final File file = new File("target/file-channel");
+        if (!deleteFiles(file)) {
+            System.err.println("Warning - unable to delete target/file-channel. Test errors may occur");
+        }
+    }
+
+    @AfterClass
+    public static void cleanupClass() {
+        StatusLogger.getLogger().reset();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        final File file = new File("target/persistent");
+        final boolean result = deleteFiles(file);
+
+        /*
+        * Clear out all other appenders associated with this logger to ensure we're
+        * only hitting the Avro appender.
+        */
+        int[] ports = findFreePorts(2);
+        System.setProperty("primaryPort", Integer.toString(ports[0]));
+        System.setProperty("alternatePort", Integer.toString(ports[1]));
+        primary = new EventCollector(ports[0]);
+        alternate = new EventCollector(ports[1]);
+        System.setProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG);
+        ctx = (LoggerContext) LogManager.getContext(false);
+        ctx.reconfigure();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        System.clearProperty(XMLConfigurationFactory.CONFIGURATION_FILE_PROPERTY);
+        ctx.reconfigure();
+        primary.stop();
+        alternate.stop();
+        final File file = new File("target/file-channel");
+        final boolean result = deleteFiles(file);
+        final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        final Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
+        for (final ObjectName name : names) {
+            try {
+                server.unregisterMBean(name);
+            } catch (final Exception ex) {
+                System.out.println("Unable to unregister " + name.toString());
+            }
+        }
+    }
+
+    @Test
+    public void testPerformance() throws Exception {
+        long start = System.currentTimeMillis();
+        int count = 10000;
+        for (int i = 0; i < count; ++i) {
+            final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
+            msg.put("counter", Integer.toString(i));
+            EventLogger.logEvent(msg);
+        }
+        long elapsed = System.currentTimeMillis() - start;
+        System.out.println("Time to log " + count + " events " + elapsed + "ms");
+    }
+
+
+    private String getBody(final Event event) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+        int n = 0;
+        while (-1 != (n = is.read())) {
+            baos.write(n);
+        }
+        return new String(baos.toByteArray());
+
+    }
+
+    private static boolean deleteFiles(final File file) {
+        boolean result = true;
+        if (file.isDirectory()) {
+
+            final File[] files = file.listFiles();
+            for (final File child : files) {
+                result &= deleteFiles(child);
+            }
+
+        } else if (!file.exists()) {
+            return true;
+        }
+
+        return result &= file.delete();
+    }
+
+    private static class EventCollector implements AvroSourceProtocol {
+        private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new LinkedBlockingQueue<AvroFlumeEvent>();
+
+        private final NettyServer nettyServer;
+
+
+        public EventCollector(int port) {
+            Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
+            nettyServer = new NettyServer(responder, new InetSocketAddress(HOSTNAME, port));
+            nettyServer.start();
+        }
+
+        public void stop() {
+            nettyServer.close();
+        }
+
+        public Event poll() {
+
+            AvroFlumeEvent avroEvent = null;
+            try {
+                avroEvent = eventQueue.poll(30000, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException ie) {
+                // Ignore the exception.
+            }
+            if (avroEvent != null) {
+                return EventBuilder.withBody(avroEvent.getBody().array(),
+                    toStringMap(avroEvent.getHeaders()));
+            } else {
+                System.out.println("No Event returned");
+            }
+            return null;
+        }
+
+        public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+            eventQueue.add(event);
+            return Status.OK;
+        }
+
+        public Status appendBatch(List<AvroFlumeEvent> events)
+            throws AvroRemoteException {
+            Preconditions.checkState(eventQueue.addAll(events));
+            return Status.OK;
+        }
+    }
+
+    private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
+        Map<String, String> stringMap = new HashMap<String, String>();
+        for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
+            stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+        }
+        return stringMap;
+    }
+
+    private static int[] findFreePorts(int count) throws IOException {
+        int[] ports = new int[count];
+        ServerSocket[] sockets = new ServerSocket[count];
+        try {
+            for (int i = 0; i < count; ++i) {
+                sockets[i] = new ServerSocket(0);
+                ports[i] = sockets[i].getLocalPort();
+            }
+        } finally {
+            for (int i = 0; i < count; ++i) {
+                if (sockets[i] != null) {
+                    try {
+                        sockets[i].close();
+                    } catch (Exception ex) {
+                        // Ignore the error.
+                    }
+                }
+            }
+        }
+        return ports;
+    }
+}
\ No newline at end of file

Copied: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml (from r1456570, logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml)
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml?p2=logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml&p1=logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml&r1=1456570&r2=1457624&rev=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml Mon Mar 18 03:15:41 2013
@@ -1,10 +1,11 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<configuration status="warn" name="MyApp" packages="">
+<configuration status="info" name="MyApp" packages="">
   <appenders>
-    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true" dataDir="InMemory">
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" type="persistent" dataDir="target/persistent">
       <Agent host="localhost" port="${sys:primaryPort}"/>
       <Agent host="localhost" port="${sys:alternatePort}"/>
       <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
+      <Property name="password">Test123!!</Property>
     </Flume>
     <Console name="STDOUT">
       <PatternLayout pattern="%d %t - [%p] %c %m%n"/>

Modified: logging/log4j/log4j2/trunk/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/pom.xml?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/pom.xml (original)
+++ logging/log4j/log4j2/trunk/pom.xml Mon Mar 18 03:15:41 2013
@@ -193,6 +193,11 @@
         <version>${project.version}</version>
       </dependency>
       <dependency>
+        <groupId>com.sleepycat</groupId>
+        <artifactId>je</artifactId>
+        <version>5.0.73</version>
+      </dependency>
+      <dependency>
         <groupId>org.osgi</groupId>
         <artifactId>core</artifactId>
         <version>4.3.0</version>
@@ -517,7 +522,14 @@
       </plugin>
     </plugins>
   </reporting>
-
+  <repositories>
+    <repository>
+      <id>oracleReleases</id>
+      <name>Oracle Released Java Packages</name>
+      <url>http://download.oracle.com/maven</url>
+      <layout>default</layout>
+    </repository>
+  </repositories>
   <distributionManagement>
      <repository>
       <id>apache.releases.https</id>

Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Mon Mar 18 03:15:41 2013
@@ -23,6 +23,9 @@
 
   <body>
     <release version="2.0-beta5" date="@TBD@" description="Bug fixes and enhancements">
+      <action dev="rgoers" type="add">
+        Added FlumePersistentManager which writes to BerkeleyDB and then writes to Flume asynchronously.
+      </action>
       <action issue="LOG4J2-175" dev="sdeboy" type="fix">
         Plugin cache should be reset when addPackages is called.
       </action>

Modified: logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml?rev=1457624&r1=1457623&r2=1457624&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml (original)
+++ logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml Mon Mar 18 03:15:41 2013
@@ -370,11 +370,13 @@
             from many different sources to a centralized data store. The FlumeAppender takes LogEvents and sends
             them to a Flume agent as serialized Avro events for consumption.</p>
           <p>
-            The Flume Appender supports two modes of operation.
+            The Flume Appender supports three modes of operation.
             <ol>
               <li>It can act as a remote Flume client which sends Flume events via Avro to a Flume Agent configured
               with an Avro Source.</li>
               <li>It can act as an embedded Flume Agent where Flume events pass directly into Flume for processing.</li>
+              <li>It can persist events to a local BerkeleyDB datastore and then asynchronously send the events to
+              Flume, similar to the embedded Flume Agent but without most of the Flume dependencies.</li>
             </ol>
             Usage as an embedded agent will cause the messages to be directly passed to the Flume Channel and then
             control will be immediately returned to the application. All interaction with remote agents will occur
@@ -420,13 +422,6 @@
                 to true and Agent elements are used instead of Property elements.</td>
             </tr>
             <tr>
-              <td>embedded</td>
-              <td>boolean</td>
-              <td>When set to true the embedded Flume agent will be used. When Agent elements are used the events
-                will be sent to a file channel and then routed to a FailoverSinkProcessor which will use
-                each configured agent in the order they are declared.</td>
-            </tr>
-            <tr>
               <td>filter</td>
               <td>Filter</td>
               <td>A Filter to determine if the event should be handled by this Appender. More than one Filter
@@ -482,10 +477,16 @@
             <tr>
               <td>properties</td>
               <td>Property[]</td>
-              <td>One or more Property elements that are used to configure the Flume Agent. The properties must be
+              <td><p>One or more Property elements that are used to configure the Flume Agent. The properties must be
                 configured without the agent name (the appender name is used for this) and no sources can be
                 configured. All other Flume configuration properties are allowed. Specifying both Agent and Property
-                elements will result in an error.</td>
+                elements will result in an error.</p>
+                <p>When used to configure in Persistent mode the valid properties are:
+                  <ol>
+                  <li>"password" to specfify that
+                the data should be encrypted when written to disk.</li>
+                </ol></p>
+              </td>
             </tr>
             <tr>
               <td>reconnectionDelay</td>
@@ -500,7 +501,12 @@
               <td>The default is true, causing exceptions to be internally logged and then ignored. When set to
                 false exceptions will be percolated to the caller.</td>
             </tr>
-            <caption align="top">FlumeAvroAppender Parameters</caption>
+            <tr>
+              <td>type</td>
+              <td>enumeration</td>
+              <td>One of "Avro", "Embedded", or "Persistent" to indicate which variation of the Appender is desired.</td>
+            </tr>
+            <caption align="top">FlumeAppender Parameters</caption>
           </table>
             <p>
               A sample FlumeAppender configuration that is configured with a primary and a secondary agent,
@@ -524,13 +530,34 @@
           </p>
           <p>
             A sample FlumeAppender configuration that is configured with a primary and a secondary agent,
+            compresses the body, formats the body using the RFC5424Layout, and persists encrypted events to disk:
+
+            <pre class="prettyprint linenums"><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<configuration status="warn" name="MyApp" packages="">
+  <appenders>
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" type="persistent" dataDir="./logData">
+      <Agent host="192.168.10.101" port="8800"/>
+      <Agent host="192.168.10.102" port="8800"/>
+      <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
+      <Property name="password">Test123!!</Property>
+    </Flume>
+  </appenders>
+  <loggers>
+    <root level="error">
+      <appender-ref ref="eventLogger"/>
+    </root>
+  </loggers>
+</configuration>]]></pre>
+          </p>
+          <p>
+            A sample FlumeAppender configuration that is configured with a primary and a secondary agent,
             compresses the body, formats the body using RFC5424Layout and passes the events to an embedded Flume
             Agent.
           </p>
           <pre class="prettyprint linenums"><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
 <configuration status="warn" name="MyApp" packages="">
   <appenders>
-    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" type="Embedded">
       <Agent host="192.168.10.101" port="8800"/>
       <Agent host="192.168.10.102" port="8800"/>
       <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
@@ -556,7 +583,7 @@
           <pre class="prettyprint linenums"><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
 <configuration status="error" name="MyApp" packages="">
   <appenders>
-    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" type="Embedded">
       <Property name="channels">file</Property>
       <Property name="channels.file.type">file</Property>
       <Property name="channels.file.checkpointDir">target/file-channel/checkpoint</Property>



Mime
View raw message