logging-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgo...@apache.org
Subject svn commit: r1484841 - in /logging/log4j/log4j2/trunk/flume-ng/src: main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java test/resources/persistent.xml
Date Tue, 21 May 2013 15:48:41 GMT
Author: rgoers
Date: Tue May 21 15:48:41 2013
New Revision: 1484841

URL: http://svn.apache.org/r1484841
Log:
Use Berkeley DB transactions. Fix logic to wait for batch or delay time to expire

Modified:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1484841&r1=1484840&r2=1484841&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
Tue May 21 15:48:41 2013
@@ -17,6 +17,7 @@
 package org.apache.logging.log4j.flume.appender;
 
 import com.sleepycat.je.Cursor;
+import com.sleepycat.je.CursorConfig;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.DatabaseEntry;
@@ -25,6 +26,7 @@ import com.sleepycat.je.EnvironmentConfi
 import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.StatsConfig;
+import com.sleepycat.je.Transaction;
 import org.apache.flume.Event;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.logging.log4j.LoggingException;
@@ -67,6 +69,8 @@ public class FlumePersistentManager exte
 
     private Database database;
 
+    private Environment environment;
+
     private final WriterThread worker;
 
     private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
@@ -85,11 +89,12 @@ public class FlumePersistentManager exte
     protected FlumePersistentManager(final String name, final String shortName, final Agent[]
agents,
                                      final int batchSize, final int retries, final int connectionTimeout,
                                      final int requestTimeout, final int delay, final Database
database,
-                                     SecretKey secretKey) {
+                                     final Environment environment, SecretKey secretKey)
{
         super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
         this.delay = delay;
         this.database = database;
-        this.worker = new WriterThread(database, this, queue, batchSize, secretKey);
+        this.environment = environment;
+        this.worker = new WriterThread(database, environment, this, queue, batchSize, secretKey);
         this.worker.start();
         this.secretKey = secretKey;
     }
@@ -155,8 +160,17 @@ public class FlumePersistentManager exte
             }
             final DatabaseEntry key = new DatabaseEntry(keyData);
             final DatabaseEntry data = new DatabaseEntry(eventData);
-            database.put(null, key, data);
-            queue.add(keyData);
+            Transaction txn = environment.beginTransaction(null, null);
+            try {
+                database.put(txn, key, data);
+                txn.commit();
+                queue.add(keyData);
+            } catch (Exception ex) {
+                if (txn != null) {
+                    txn.abort();
+                }
+                throw ex;
+            }
         } catch (Exception ex) {
             throw new LoggingException("Exception occurred writing log event", ex);
         }
@@ -177,6 +191,12 @@ public class FlumePersistentManager exte
         } catch (final Exception ex) {
             LOGGER.warn("Failed to close database", ex);
         }
+        try {
+            environment.cleanLog();
+            environment.close();
+        } catch (final Exception ex) {
+            LOGGER.warn("Failed to close environment", ex);
+        }
         super.releaseSub();
     }
 
@@ -237,6 +257,7 @@ public class FlumePersistentManager exte
             SecretKey secretKey = null;
 
             Database database;
+            Environment environment;
 
             Map<String, String> properties = new HashMap<String, String>();
             if (data.properties != null) {
@@ -250,11 +271,12 @@ public class FlumePersistentManager exte
                 File dir = new File(data.dataDir);
                 FileUtils.mkdir(dir, true);
                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
-                dbEnvConfig.setTransactional(false);
+                dbEnvConfig.setTransactional(true);
                 dbEnvConfig.setAllowCreate(true);
-                final Environment environment = new Environment(dir, dbEnvConfig);
+                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
+                environment = new Environment(dir, dbEnvConfig);
                 final DatabaseConfig dbConfig = new DatabaseConfig();
-                dbConfig.setTransactional(false);
+                dbConfig.setTransactional(true);
                 dbConfig.setAllowCreate(true);
                 database = environment.openDatabase(null, name, dbConfig);
             } catch (final Exception ex) {
@@ -301,21 +323,23 @@ public class FlumePersistentManager exte
                 LOGGER.warn("Error setting up encryption - encryption will be disabled",
ex);
             }
             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize,
data.retries,
-                data.connectionTimeout, data.requestTimeout, data.delay, database, secretKey);
+                data.connectionTimeout, data.requestTimeout, data.delay, database, environment,
secretKey);
         }
     }
 
     private static class WriterThread extends Thread  {
         private volatile boolean shutdown = false;
         private final Database database;
+        private final Environment environment;
         private final FlumePersistentManager manager;
         private final LinkedBlockingQueue<byte[]> queue;
         private final SecretKey secretKey;
         private final int batchSize;
 
-        public WriterThread(Database database, FlumePersistentManager manager, LinkedBlockingQueue<byte[]>
queue,
-                            int batchsize, SecretKey secretKey) {
+        public WriterThread(Database database, Environment environment, FlumePersistentManager
manager,
+                            LinkedBlockingQueue<byte[]> queue, int batchsize, SecretKey
secretKey) {
             this.database = database;
+            this.environment = environment;
             this.manager = manager;
             this.queue = queue;
             this.batchSize = batchsize;
@@ -337,76 +361,100 @@ public class FlumePersistentManager exte
 
         @Override
         public void run() {
-            LOGGER.trace("WriterThread started");
-            long lastBatch = System.currentTimeMillis();
+            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay =
" + manager.delay);
+            long nextBatch = System.currentTimeMillis() + manager.delay;
             while (!shutdown) {
-                if (database.count() >= batchSize ||
-                    database.count() > 0 && lastBatch + manager.delay > System.currentTimeMillis())
{
-                    lastBatch = System.currentTimeMillis();
+                long now = System.currentTimeMillis();
+                if (database.count() >= batchSize || (database.count() > 0 &&
nextBatch < now)) {
+                    nextBatch = now + manager.delay;
                     try {
                         boolean errors = false;
                         DatabaseEntry key = new DatabaseEntry();
                         final DatabaseEntry data = new DatabaseEntry();
-                        final Cursor cursor = database.openCursor(null, null);
-                        try {
-                            queue.clear();
-                            OperationStatus status;
+
+                        queue.clear();
+                        OperationStatus status;
+                        if (batchSize > 1) {
+                            Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
                             try {
-                                status = cursor.getFirst(key, data, LockMode.RMW);
-                                if (batchSize > 1) {
-                                    BatchEvent batch = new BatchEvent();
-                                    for (int i = 0; status == OperationStatus.SUCCESS &&
i < batchSize; ++i) {
-                                        SimpleEvent event = createEvent(data);
-                                        if (event != null) {
-                                            batch.addEvent(event);
-                                        }
-                                        status = cursor.getNext(key, data, LockMode.RMW);
-                                    }
-                                    try {
-                                        manager.send(batch);
-                                    } catch (Exception ioe) {
-                                        LOGGER.error("Error sending events", ioe);
-                                        break;
+                                status = cursor.getFirst(key, data, null);
+
+                                BatchEvent batch = new BatchEvent();
+                                for (int i = 0; status == OperationStatus.SUCCESS &&
i < batchSize; ++i) {
+                                    SimpleEvent event = createEvent(data);
+                                    if (event != null) {
+                                        batch.addEvent(event);
                                     }
+                                    status = cursor.getNext(key, data, null);
+                                }
+                                try {
+                                    manager.send(batch);
+                                } catch (Exception ioe) {
+                                    LOGGER.error("Error sending events", ioe);
+                                    break;
+                                }
+                                cursor.close();
+                                cursor = null;
+                                Transaction txn = environment.beginTransaction(null, null);
+                                try {
                                     for (Event event : batch.getEvents()) {
                                         try {
                                             Map<String, String> headers = event.getHeaders();
                                             key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
-                                            database.delete(null, key);
+                                            database.delete(txn, key);
                                         } catch (Exception ex) {
                                             LOGGER.error("Error deleting key from database",
ex);
                                         }
                                     }
-                                } else {
-                                    while (status == OperationStatus.SUCCESS) {
-                                        SimpleEvent event = createEvent(data);
-                                        if (event != null) {
+                                    txn.commit();
+                                } catch (Exception ex) {
+                                    LOGGER.error("Unable to commit transaction", ex);
+                                    if (txn != null) {
+                                        txn.abort();
+                                    }
+                                }
+                            } catch (Exception ex) {
+                                LOGGER.error("Error reading database", ex);
+                                shutdown = true;
+                                break;
+                            } finally {
+                                if (cursor != null) {
+                                    cursor.close();
+                                }
+                            }
+                        } else {
+                            Cursor cursor = database.openCursor(null, null);
+                            try {
+                                status = cursor.getFirst(key, data, LockMode.RMW);
+                                while (status == OperationStatus.SUCCESS) {
+                                    SimpleEvent event = createEvent(data);
+                                    if (event != null) {
+                                        try {
+                                            manager.doSend(event);
+                                        } catch (Exception ioe) {
+                                            errors = true;
+                                            LOGGER.error("Error sending event", ioe);
+                                            break;
+                                        }
+                                        if (!errors) {
                                             try {
-                                                manager.doSend(event);
-                                            } catch (Exception ioe) {
-                                                errors = true;
-                                                LOGGER.error("Error sending event", ioe);
-                                                break;
-                                            }
-                                            if (!errors) {
-                                                try {
-                                                    cursor.delete();
-                                                } catch (Exception ex) {
-                                                    LOGGER.error("Unable to delete event",
ex);
-                                                }
+                                                cursor.delete();
+                                            } catch (Exception ex) {
+                                                LOGGER.error("Unable to delete event", ex);
                                             }
                                         }
-                                        status = cursor.getNext(key, data, LockMode.RMW);
                                     }
+                                    status = cursor.getNext(key, data, LockMode.RMW);
                                 }
                             } catch (Exception ex) {
                                 LOGGER.error("Error reading database", ex);
                                 shutdown = true;
                                 break;
+                            } finally {
+                                if (cursor != null) {
+                                    cursor.close();
+                                }
                             }
-
-                        } finally {
-                            cursor.close();
                         }
                         if (errors) {
                             Thread.sleep(manager.delay);
@@ -416,18 +464,22 @@ public class FlumePersistentManager exte
                         LOGGER.warn("WriterThread encountered an exception. Continuing.",
ex);
                     }
                 } else {
-                    try {
-                        if (database.count() >= batchSize) {
-                            continue;
+                    while (!shutdown && database.count() < batchSize &&
nextBatch > now) {
+                        try {
+                            long interval = nextBatch - now;
+                            queue.poll(interval, TimeUnit.MILLISECONDS);
+                        } catch (InterruptedException ie) {
+                            LOGGER.warn("WriterThread interrupted, continuing");
+                        } catch (Exception ex) {
+                            LOGGER.error("WriterThread encountered an exception waiting for
work", ex);
+                            break;
+                        }
+                        now = System.currentTimeMillis();
+                        if (database.count() == 0) {
+                            nextBatch = now + manager.delay;
                         }
-                        queue.poll(manager.delay, TimeUnit.MILLISECONDS);
-                        LOGGER.debug("WriterThread notified of work");
-                    } catch (InterruptedException ie) {
-                        LOGGER.warn("WriterThread interrupted, continuing");
-                    } catch (Exception ex) {
-                        LOGGER.error("WriterThread encountered an exception waiting for work",
ex);
-                        break;
                     }
+                    LOGGER.debug("WriterThread ready to work");
                 }
             }
             LOGGER.trace("WriterThread exiting");

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml?rev=1484841&r1=1484840&r2=1484841&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml Tue May 21 15:48:41
2013
@@ -2,7 +2,7 @@
 <configuration status="info" name="MyApp" packages="org.apache.logging.log4j.flume.test">
   <appenders>
     <Flume name="eventLogger" suppressExceptions="false" compress="true" type="persistent"
dataDir="target/persistent"
-        batchsize="100">
+        batchsize="100" maxDelay="500">
       <Agent host="localhost" port="${sys:primaryPort}"/>
       <Agent host="localhost" port="${sys:alternatePort}"/>
       <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>



Mime
View raw message