logging-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgo...@apache.org
Subject svn commit: r1509752 - in /logging/log4j/log4j2/trunk: flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java src/changes/changes.xml
Date Fri, 02 Aug 2013 15:59:23 GMT
Author: rgoers
Date: Fri Aug  2 15:59:22 2013
New Revision: 1509752

URL: http://svn.apache.org/r1509752
Log:
LOG4J2-335 - FlumePersistentManager's writer thread had high CPU usage.

Modified:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
    logging/log4j/log4j2/trunk/src/changes/changes.xml

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1509752&r1=1509751&r2=1509752&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
Fri Aug  2 15:59:22 2013
@@ -131,6 +131,10 @@ public class FlumeAvroManager extends Ab
         return requestTimeout;
     }
 
+    public int getBatchSize() {
+        return batchSize;
+    }
+
     public synchronized void send(final BatchEvent events) {
         if (rpcClient == null) {
             rpcClient = connect(agents, retries, connectTimeout, requestTimeout);

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1509752&r1=1509751&r2=1509752&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
Fri Aug  2 15:59:22 2013
@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import javax.crypto.Cipher;
 import javax.crypto.SecretKey;
@@ -74,6 +75,8 @@ public class FlumePersistentManager exte
 
     private static final int SHUTDOWN_WAIT = 60;
 
+    private static final int MILLIS_PER_SECOND = 1000;
+
     private static BDBManagerFactory factory = new BDBManagerFactory();
 
     private final Database database;
@@ -90,6 +93,8 @@ public class FlumePersistentManager exte
 
     private final ExecutorService threadPool;
 
+    private AtomicLong dbCount = new AtomicLong();
+
     /**
      * Constructor
      * @param name The unique name of this manager.
@@ -112,7 +117,8 @@ public class FlumePersistentManager exte
         this.delay = delay;
         this.database = database;
         this.environment = environment;
-        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey);
+        dbCount.set(database.count());
+        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey,
dbCount);
         this.worker.start();
         this.secretKey = secretKey;
         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
@@ -185,7 +191,7 @@ public class FlumePersistentManager exte
                 eventData = cipher.doFinal(eventData);
             }
             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData,
eventData, environment, database,
-                gate));
+                gate, dbCount, getBatchSize()));
             boolean interrupted = false;
             int count = 0;
             do {
@@ -206,6 +212,11 @@ public class FlumePersistentManager exte
     protected void releaseSub() {
         LOGGER.debug("Shutting down FlumePersistentManager");
         worker.shutdown();
+        try {
+            worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
+        } catch(InterruptedException ie) {
+            // Ignore the exception and shutdown.
+        }
         threadPool.shutdown();
         try {
             threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
@@ -246,14 +257,18 @@ public class FlumePersistentManager exte
         private final Environment environment;
         private final Database database;
         private final Gate gate;
+        private final AtomicLong dbCount;
+        private final long batchSize;
 
         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment
environment,
-                         final Database database, final Gate gate) {
+                         final Database database, final Gate gate, final AtomicLong dbCount,
final long batchSize) {
             this.keyData = keyData;
             this.eventData = eventData;
             this.environment = environment;
             this.database = database;
             this.gate = gate;
+            this.dbCount = dbCount;
+            this.batchSize = batchSize;
         }
 
         @Override
@@ -264,7 +279,9 @@ public class FlumePersistentManager exte
             try {
                 database.put(txn, key, data);
                 txn.commit();
-                gate.open();
+                if (dbCount.incrementAndGet() >= batchSize) {
+                    gate.open();
+                }
             } catch (final Exception ex) {
                 if (txn != null) {
                     txn.abort();
@@ -409,9 +426,11 @@ public class FlumePersistentManager exte
         private final Gate gate;
         private final SecretKey secretKey;
         private final int batchSize;
+        private final AtomicLong dbCounter;
 
         public WriterThread(final Database database, final Environment environment,
-                            final FlumePersistentManager manager, final Gate gate, final
int batchsize, final SecretKey secretKey) {
+                            final FlumePersistentManager manager, final Gate gate, final
int batchsize,
+                            final SecretKey secretKey, final AtomicLong dbCount) {
             this.database = database;
             this.environment = environment;
             this.manager = manager;
@@ -419,6 +438,7 @@ public class FlumePersistentManager exte
             this.batchSize = batchsize;
             this.secretKey = secretKey;
             this.setDaemon(true);
+            this.dbCounter = dbCount;
         }
 
         public void shutdown() {
@@ -438,7 +458,8 @@ public class FlumePersistentManager exte
             while (!shutdown) {
                 long now = System.currentTimeMillis();
                 long dbCount = database.count();
-                if (dbCount >= batchSize || (dbCount > 0 && nextBatch <
now)) {
+                dbCounter.set(dbCount);
+                if (dbCount >= batchSize || (dbCount > 0 && nextBatch <=
now)) {
                     nextBatch = now + manager.delay;
                     try {
                         boolean errors = false;
@@ -448,54 +469,10 @@ public class FlumePersistentManager exte
                         gate.close();
                         OperationStatus status;
                         if (batchSize > 1) {
-                            Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
                             try {
-                                status = cursor.getFirst(key, data, null);
-
-                                final BatchEvent batch = new BatchEvent();
-                                for (int i = 0; status == OperationStatus.SUCCESS &&
i < batchSize; ++i) {
-                                    final SimpleEvent event = createEvent(data);
-                                    if (event != null) {
-                                        batch.addEvent(event);
-                                    }
-                                    status = cursor.getNext(key, data, null);
-                                }
-                                try {
-                                    manager.send(batch);
-                                } catch (final Exception ioe) {
-                                    LOGGER.error("Error sending events", ioe);
-                                    errors = true;
-                                }
-                                if (!errors) {
-                                    cursor.close();
-                                    cursor = null;
-                                    final Transaction txn = environment.beginTransaction(null,
null);
-                                    try {
-                                        for (final Event event : batch.getEvents()) {
-                                            try {
-                                                final Map<String, String> headers =
event.getHeaders();
-                                                key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
-                                                database.delete(txn, key);
-                                            } catch (final Exception ex) {
-                                                LOGGER.error("Error deleting key from database",
ex);
-                                            }
-                                        }
-                                        txn.commit();
-                                    } catch (final Exception ex) {
-                                        LOGGER.error("Unable to commit transaction", ex);
-                                        if (txn != null) {
-                                            txn.abort();
-                                        }
-                                    }
-                                }
+                                errors = sendBatch(key, data);
                             } catch (final Exception ex) {
-                                LOGGER.error("Error reading database", ex);
-                                shutdown = true;
                                 break;
-                            } finally {
-                                if (cursor != null) {
-                                    cursor.close();
-                                }
                             }
                         } else {
                             Transaction txn = environment.beginTransaction(null, null);
@@ -526,6 +503,7 @@ public class FlumePersistentManager exte
                                 }
                                 txn.commit();
                                 txn = null;
+                                dbCounter.decrementAndGet();
                             } catch (final Exception ex) {
                                 LOGGER.error("Error reading or writing to database", ex);
                                 shutdown = true;
@@ -547,26 +525,90 @@ public class FlumePersistentManager exte
                         LOGGER.warn("WriterThread encountered an exception. Continuing.",
ex);
                     }
                 } else {
-                    while (!shutdown && (dbCount == 0 || dbCount < batchSize &&
nextBatch > now)) {
-                        try {
-                            final long interval = nextBatch - now;
-                            gate.waitForOpen(interval, TimeUnit.MILLISECONDS);
-                        } catch (final InterruptedException ie) {
-                            LOGGER.warn("WriterThread interrupted, continuing");
-                        } catch (final Exception ex) {
-                            LOGGER.error("WriterThread encountered an exception waiting for
work", ex);
-                            break;
+                    if (nextBatch <= now) {
+                        nextBatch = now + manager.delay;
+                    }
+                    try {
+                        final long interval = nextBatch - now;
+                        gate.waitForOpen(interval);
+                    } catch (final InterruptedException ie) {
+                        LOGGER.warn("WriterThread interrupted, continuing");
+                    } catch (final Exception ex) {
+                        LOGGER.error("WriterThread encountered an exception waiting for work",
ex);
+                        break;
+                    }
+                }
+            }
+
+            if (batchSize > 1 && database.count() > 0) {
+                DatabaseEntry key = new DatabaseEntry();
+                final DatabaseEntry data = new DatabaseEntry();
+                try {
+                    sendBatch(key, data);
+                } catch (final Exception ex) {
+                    LOGGER.warn("Unable to write final batch");
+                }
+            }
+            LOGGER.trace("WriterThread exiting");
+        }
+
+        private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception
{
+            boolean errors = false;
+            OperationStatus status;
+            Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
+            try {
+                status = cursor.getFirst(key, data, null);
+
+                final BatchEvent batch = new BatchEvent();
+                for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize;
++i) {
+                    final SimpleEvent event = createEvent(data);
+                    if (event != null) {
+                        batch.addEvent(event);
+                    }
+                    status = cursor.getNext(key, data, null);
+                }
+                try {
+                    manager.send(batch);
+                } catch (final Exception ioe) {
+                    LOGGER.error("Error sending events", ioe);
+                    errors = true;
+                }
+                if (!errors) {
+                    cursor.close();
+                    cursor = null;
+                    final Transaction txn = environment.beginTransaction(null, null);
+                    try {
+                        for (final Event event : batch.getEvents()) {
+                            try {
+                                final Map<String, String> headers = event.getHeaders();
+                                key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+                                database.delete(txn, key);
+                            } catch (final Exception ex) {
+                                LOGGER.error("Error deleting key from database", ex);
+                            }
+                        }
+                        txn.commit();
+                        long count = dbCounter.get();
+                        while (!dbCounter.compareAndSet(count, count - batch.getEvents().size()))
{
+                            count = dbCounter.get();
                         }
-                        now = System.currentTimeMillis();
-                        dbCount = database.count();
-                        if (!gate.isSignalled()) {
-                            nextBatch = now + manager.delay;
+                    } catch (final Exception ex) {
+                        LOGGER.error("Unable to commit transaction", ex);
+                        if (txn != null) {
+                            txn.abort();
                         }
                     }
-                    LOGGER.debug("WriterThread ready to work");
+                }
+            } catch (final Exception ex) {
+                LOGGER.error("Error reading database", ex);
+                shutdown = true;
+                throw ex;
+            } finally {
+                if (cursor != null) {
+                    cursor.close();
                 }
             }
-            LOGGER.trace("WriterThread exiting");
+            return errors;
         }
 
         private SimpleEvent createEvent(final DatabaseEntry data) {
@@ -629,27 +671,24 @@ public class FlumePersistentManager exte
     }
 
     private static class Gate {
-        private static class Synchronizer extends AbstractQueuedSynchronizer {
-            boolean isSignalled() { return getState() != 0; }
 
-            @Override
-			protected int tryAcquireShared(int ignore) {
-                return isSignalled()? 1 : -1;
-            }
+        private boolean isOpen = false;
 
-            @Override
-			protected boolean tryReleaseShared(int state) {
-                setState(state);
-                return true;
-            }
+        public boolean isOpen() {
+            return isOpen;
+        }
+
+        public synchronized void open() {
+            isOpen = true;
+            notifyAll();
+        }
+
+        public synchronized void close() {
+            isOpen = false;
         }
 
-        private final Synchronizer sync = new Synchronizer();
-        public boolean isSignalled() { return sync.isSignalled(); }
-        public void open()         { sync.releaseShared(1); }
-        public void close()        { sync.releaseShared(0); }
-        public void waitForOpen(long timeout, TimeUnit timeUnit) throws InterruptedException
{
-            sync.tryAcquireSharedNanos(1, timeUnit.toNanos(timeout));
+        public synchronized void waitForOpen(long timeout) throws InterruptedException {
+            wait(timeout);
         }
     }
 }

Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1509752&r1=1509751&r2=1509752&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Fri Aug  2 15:59:22 2013
@@ -21,6 +21,9 @@
   </properties>
   <body>
     <release version="2.0-beta9" date="soon, very soon" description="Bug fixes and enhancements">
+      <action issue="LOG4J2-335" dev="rgoers" type="fix">
+        FlumePersistentManager's writer thread had high CPU usage.
+      </action>
       <action issue="LOG4J2-331" dev="nickwilliams" type="fix">
         Removed erroneous check for affected MongoDB records, which always returns zero on
inserts.
       </action>



Mime
View raw message