logging-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgo...@apache.org
Subject svn commit: r1462863 - in /logging/log4j/log4j2/trunk/flume-ng/src: main/java/org/apache/logging/log4j/flume/appender/ test/java/org/apache/logging/log4j/flume/appender/ test/resources/
Date Sat, 30 Mar 2013 23:45:45 GMT
Author: rgoers
Date: Sat Mar 30 23:45:45 2013
New Revision: 1462863

URL: http://svn.apache.org/r1462863
Log:
Add batch support to FlumePersistentManager

Added:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
Modified:
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml

Added: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java?rev=1462863&view=auto
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
(added)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
Sat Mar 30 23:45:45 2013
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import org.apache.flume.event.SimpleEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class BatchEvent {
+
+    private List<SimpleEvent> events = new ArrayList<SimpleEvent>();
+
+    public void addEvent(SimpleEvent event) {
+        events.add(event);
+    }
+
+    public List<SimpleEvent> getEvents() {
+        return events;
+    }
+}

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1462863&r1=1462862&r2=1462863&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
Sat Mar 30 23:45:45 2013
@@ -119,6 +119,67 @@ public class FlumeAvroManager extends Ab
         return current;
     }
 
+    public synchronized void send(final BatchEvent events) {
+        if (client == null) {
+            client = connect(agents);
+        }
+
+        if (client != null) {
+            final List<SimpleEvent> list = events.getEvents();
+            final List<AvroFlumeEvent> batch = new ArrayList<AvroFlumeEvent>(list.size());
+            for (SimpleEvent event : list) {
+                final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+                avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
+                avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
+
+                for (final Map.Entry<String, String> entry : event.getHeaders().entrySet())
{
+                    avroEvent.getHeaders().put(entry.getKey(), entry.getValue());
+                }
+                batch.add(avroEvent);
+            }
+
+            try {
+                final Status status = client.appendBatch(batch);
+                if (status.equals(Status.OK)) {
+                    return;
+                } else {
+                    LOGGER.warn("RPC communication failed to " + agents[current].getHost()
+
+                        ":" + agents[current].getPort());
+                }
+            } catch (final Exception ex) {
+                String msg = "Unable to write to " + getName() + " at " + agents[current].getHost()
+ ":" +
+                    agents[current].getPort();
+                LOGGER.warn(msg, ex);
+            }
+
+            for (int index = 0; index < agents.length; ++index) {
+                if (index == current) {
+                    continue;
+                }
+                final Agent agent = agents[index];
+                try {
+                    transceiver = null;
+                    final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
+                    final Status status = c.appendBatch(batch);
+                    if (!status.equals(Status.OK)) {
+                        final String warnMsg = "RPC communication failed to " + getName()
+ " at " +
+                            agent.getHost() + ":" + agent.getPort();
+                        LOGGER.warn(warnMsg);
+                        continue;
+                    }
+                    client = c;
+                    current = index;
+                    return;
+                } catch (final Exception ex) {
+                    final String warnMsg = "Unable to write to " + getName() + " at " + agent.getHost()
+ ":" +
+                        agent.getPort();
+                    LOGGER.warn(warnMsg, ex);
+                }
+            }
+        }
+        throw new AppenderRuntimeException("No Flume agents are available");
+    }
+
     @Override
     public synchronized void send(final SimpleEvent event, int delay, int retries)  {
         if (delay == 0) {
@@ -188,7 +249,7 @@ public class FlumeAvroManager extends Ab
                             continue;
                         }
                         client = c;
-                        current = i;
+                        current = index;
                         return;
                     } catch (final Exception ex) {
                         if (i == retries - 1) {

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1462863&r1=1462862&r2=1462863&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
Sat Mar 30 23:45:45 2013
@@ -45,6 +45,7 @@ import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -88,7 +89,7 @@ public class FlumePersistentManager exte
                                      SecretKey secretKey) {
         super(name, shortName, agents, batchSize);
         this.database = database;
-        this.worker = new WriterThread(database, this, queue, secretKey);
+        this.worker = new WriterThread(database, this, queue, batchSize, secretKey);
         this.worker.start();
         this.reconnectionDelay = reconnectionDelay <= 0 ? DEFAULT_DELAY : reconnectionDelay;
         this.secretKey = secretKey;
@@ -113,7 +114,7 @@ public class FlumePersistentManager exte
         }
         String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR
: dataDir;
 
-        final StringBuilder sb = new StringBuilder("FlumeKrati[");
+        final StringBuilder sb = new StringBuilder("FlumePersistent[");
         boolean first = true;
         for (final Agent agent : agents) {
             if (!first) {
@@ -300,12 +301,14 @@ public class FlumePersistentManager exte
         private final FlumePersistentManager manager;
         private final LinkedBlockingQueue<byte[]> queue;
         private final SecretKey secretKey;
+        private final int batchSize;
 
         public WriterThread(Database database, FlumePersistentManager manager, LinkedBlockingQueue<byte[]>
queue,
-                            SecretKey secretKey) {
+                            int batchsize, SecretKey secretKey) {
             this.database = database;
             this.manager = manager;
             this.queue = queue;
+            this.batchSize = batchsize;
             this.secretKey = secretKey;
         }
 
@@ -323,93 +326,130 @@ public class FlumePersistentManager exte
         @Override
         public void run() {
             LOGGER.trace("WriterThread started");
+            long lastBatch = System.currentTimeMillis();
             while (!shutdown) {
-                try {
-                    boolean errors = false;
-                    final DatabaseEntry key = new DatabaseEntry();
-                    final DatabaseEntry data = new DatabaseEntry();
-                    final Cursor cursor = database.openCursor(null, null);
+                if (database.count() >= batchSize ||
+                    database.count() > 0 && lastBatch + manager.reconnectionDelay
> System.currentTimeMillis()) {
                     try {
-                        queue.clear();
-                        OperationStatus status;
+                        boolean errors = false;
+                        DatabaseEntry key = new DatabaseEntry();
+                        final DatabaseEntry data = new DatabaseEntry();
+                        final Cursor cursor = database.openCursor(null, null);
                         try {
-                            status = cursor.getFirst(key, data, LockMode.RMW);
-
-                            while (status == OperationStatus.SUCCESS) {
-                                SimpleEvent event = new SimpleEvent();
-                                try {
-                                    byte[] eventData = data.getData();
-                                    if (secretKey != null) {
-                                        Cipher cipher = Cipher.getInstance("AES");
-                                        cipher.init(Cipher.DECRYPT_MODE, secretKey);
-                                        eventData = cipher.doFinal(eventData);
-                                    }
-                                    ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
-                                    DataInputStream dais = new DataInputStream(bais);
-                                    int length = dais.readInt();
-                                    byte[] bytes = new byte[length];
-                                    dais.read(bytes, 0, length);
-                                    event.setBody(bytes);
-                                    length = dais.readInt();
-                                    Map<String, String> map = new HashMap<String,
String>(length);
-                                    for (int i = 0; i < length; ++i) {
-                                        String headerKey = dais.readUTF();
-                                        String value = dais.readUTF();
-                                        map.put(headerKey, value);
+                            queue.clear();
+                            OperationStatus status;
+                            try {
+                                status = cursor.getFirst(key, data, LockMode.RMW);
+                                if (batchSize > 1) {
+                                    BatchEvent batch = new BatchEvent();
+                                    while (status == OperationStatus.SUCCESS) {
+                                        SimpleEvent event = createEvent(data);
+                                        if (event != null) {
+                                            batch.addEvent(event);
+                                        }
+                                        status = cursor.getNext(key, data, LockMode.RMW);
                                     }
-                                    event.setHeaders(map);
-                                } catch (Exception ex) {
-                                    errors = true;
-                                    LOGGER.error("Error retrieving event", ex);
-                                    continue;
-                                }
-                                try {
-                                    manager.doSend(event);
-                                } catch (Exception ioe) {
-                                    errors = true;
-                                    LOGGER.error("Error sending event", ioe);
-                                    break;
-                                }
-                                if (!errors) {
                                     try {
-                                        cursor.delete();
-                                    } catch (Exception ex) {
-                                        LOGGER.error("Unable to delete event", ex);
+                                        manager.send(batch);
+                                        lastBatch = System.currentTimeMillis();
+                                    } catch (Exception ioe) {
+                                        LOGGER.error("Error sending events", ioe);
+                                        break;
+                                    }
+                                    for (SimpleEvent event : batch.getEvents()) {
+                                        try {
+                                            Map<String, String> headers = event.getHeaders();
+                                            key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+                                            database.delete(null, key);
+                                        } catch (Exception ex) {
+                                            LOGGER.error("Error deleting key from database",
ex);
+                                        }
+                                    }
+                                } else {
+                                    while (status == OperationStatus.SUCCESS) {
+                                        SimpleEvent event = createEvent(data);
+                                        if (event != null) {
+                                            try {
+                                                manager.doSend(event);
+                                            } catch (Exception ioe) {
+                                                errors = true;
+                                                LOGGER.error("Error sending event", ioe);
+                                                break;
+                                            }
+                                            if (!errors) {
+                                                try {
+                                                    cursor.delete();
+                                                } catch (Exception ex) {
+                                                    LOGGER.error("Unable to delete event",
ex);
+                                                }
+                                            }
+                                        }
+                                        status = cursor.getNext(key, data, LockMode.RMW);
                                     }
                                 }
-                                status = cursor.getNext(key, data, LockMode.RMW);
+                            } catch (Exception ex) {
+                                LOGGER.error("Error reading database", ex);
+                                shutdown = true;
+                                break;
                             }
-                        } catch (Exception ex) {
-                            LOGGER.error("Error reading database", ex);
-                            shutdown = true;
-                            break;
-                        }
 
-                    } finally {
-                        cursor.close();
-                    }
-                    if (errors) {
-                        Thread.sleep(manager.reconnectionDelay);
-                        continue;
+                        } finally {
+                            cursor.close();
+                        }
+                        if (errors) {
+                            Thread.sleep(manager.reconnectionDelay);
+                            continue;
+                        }
+                    } catch (Exception ex) {
+                        LOGGER.warn("WriterThread encountered an exception. Continuing.",
ex);
                     }
-                } catch (Exception ex) {
-                    LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
-                }
-                try {
-                    if (database.count() > 0) {
-                        continue;
+                } else {
+                    try {
+                        if (database.count() >= batchSize) {
+                            continue;
+                        }
+                        queue.poll(manager.reconnectionDelay, TimeUnit.MILLISECONDS);
+                        LOGGER.debug("WriterThread notified of work");
+                    } catch (InterruptedException ie) {
+                        LOGGER.warn("WriterThread interrupted, continuing");
+                    } catch (Exception ex) {
+                        LOGGER.error("WriterThread encountered an exception waiting for work",
ex);
+                        break;
                     }
-                    queue.take();
-                    LOGGER.debug("WriterThread notified of work");
-                } catch (InterruptedException ie) {
-                    LOGGER.warn("WriterThread interrupted, continuing");
-                } catch (Exception ex) {
-                    LOGGER.error("WriterThread encountered an exception waiting for work",
ex);
-                    break;
                 }
             }
             LOGGER.trace("WriterThread exiting");
         }
 
+        private SimpleEvent createEvent(DatabaseEntry data) {
+            SimpleEvent event = new SimpleEvent();
+            try {
+                byte[] eventData = data.getData();
+                if (secretKey != null) {
+                    Cipher cipher = Cipher.getInstance("AES");
+                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
+                    eventData = cipher.doFinal(eventData);
+                }
+                ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
+                DataInputStream dais = new DataInputStream(bais);
+                int length = dais.readInt();
+                byte[] bytes = new byte[length];
+                dais.read(bytes, 0, length);
+                event.setBody(bytes);
+                length = dais.readInt();
+                Map<String, String> map = new HashMap<String, String>(length);
+                for (int i = 0; i < length; ++i) {
+                    String headerKey = dais.readUTF();
+                    String value = dais.readUTF();
+                    map.put(headerKey, value);
+                }
+                event.setHeaders(map);
+                return event;
+            } catch (Exception ex) {
+                LOGGER.error("Error retrieving event", ex);
+                return null;
+            }
+        }
+
     }
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java?rev=1462863&r1=1462862&r2=1462863&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
(original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java
Sat Mar 30 23:45:45 2013
@@ -215,7 +215,7 @@ public class FlumePersistentAppenderTest
             Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]);
         }
     }
-
+    /*
     @Test
     public void testPerformance() throws Exception {
         long start = System.currentTimeMillis();
@@ -227,7 +227,7 @@ public class FlumePersistentAppenderTest
         }
         long elapsed = System.currentTimeMillis() - start;
         System.out.println("Time to log " + count + " events " + elapsed + "ms");
-    }
+    }    */
 
 
     private String getBody(final Event event) throws IOException {
@@ -292,12 +292,15 @@ public class FlumePersistentAppenderTest
 
         public Status append(AvroFlumeEvent event) throws AvroRemoteException {
             eventQueue.add(event);
+            //System.out.println("Received event " + event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
             return Status.OK;
         }
 
-        public Status appendBatch(List<AvroFlumeEvent> events)
-            throws AvroRemoteException {
+        public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException
{
             Preconditions.checkState(eventQueue.addAll(events));
+            for (AvroFlumeEvent event : events) {
+               // System.out.println("Received event " + event.getHeaders().get(new org.apache.avro.util.Utf8(FlumeEvent.GUID)));
+            }
             return Status.OK;
         }
     }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml?rev=1462863&r1=1462862&r2=1462863&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/persistent.xml Sat Mar 30 23:45:45
2013
@@ -1,7 +1,8 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration status="info" name="MyApp" packages="org.apache.logging.log4j.flume.test">
   <appenders>
-    <Flume name="eventLogger" suppressExceptions="false" compress="true" type="persistent"
dataDir="target/persistent">
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" type="persistent"
dataDir="target/persistent"
+        batchsize="100">
       <Agent host="localhost" port="${sys:primaryPort}"/>
       <Agent host="localhost" port="${sys:alternatePort}"/>
       <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>



Mime
View raw message