logging-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgo...@apache.org
Subject svn commit: r1468306 - in /logging/log4j/log4j2/trunk: flume-ng/ flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/ flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/ flume-ng/src/test/resources/ src/changes/ src/site/xdoc/man...
Date Tue, 16 Apr 2013 06:03:45 GMT
Author: rgoers
Date: Tue Apr 16 06:03:44 2013
New Revision: 1468306

URL: http://svn.apache.org/r1468306
Log:
LOG4J2-196, LOG4J2-198 - Use Flume RPCClient - upgrade to Flume 1.3.1

Modified:
    logging/log4j/log4j2/trunk/flume-ng/pom.xml
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
    logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml
    logging/log4j/log4j2/trunk/src/changes/changes.xml
    logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml

Modified: logging/log4j/log4j2/trunk/flume-ng/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/pom.xml?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/pom.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/pom.xml Tue Apr 16 06:03:44 2013
@@ -32,7 +32,7 @@
     <log4jParentDir>${basedir}/..</log4jParentDir>
     <docLabel>Flume Documentation</docLabel>
     <projectDir>/flume-ng</projectDir>
-    <flumeVersion>1.2.0</flumeVersion>
+    <flumeVersion>1.3.1</flumeVersion>
   </properties>
   <dependencies>
     <dependency>

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java Tue Apr 16 06:03:44 2013
@@ -16,7 +16,7 @@
  */
 package org.apache.logging.log4j.flume.appender;
 
-import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.Event;
 import org.apache.logging.log4j.core.appender.AbstractManager;
 
 /**
@@ -28,5 +28,5 @@ public abstract class AbstractFlumeManag
         super(name);
     }
 
-    public abstract void send(SimpleEvent event, int delay, int retries);
+    public abstract void send(Event event);
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java Tue Apr 16 06:03:44 2013
@@ -16,7 +16,7 @@
  */
 package org.apache.logging.log4j.flume.appender;
 
-import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.Event;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -26,13 +26,13 @@ import java.util.List;
  */
 public class BatchEvent {
 
-    private List<SimpleEvent> events = new ArrayList<SimpleEvent>();
+    private List<Event> events = new ArrayList<Event>();
 
-    public void addEvent(SimpleEvent event) {
+    public void addEvent(Event event) {
         events.add(event);
     }
 
-    public List<SimpleEvent> getEvents() {
+    public List<Event> getEvents() {
         return events;
     }
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java Tue Apr 16 06:03:44 2013
@@ -47,10 +47,6 @@ public final class FlumeAppender extends
 
     private final boolean compressBody;
 
-    private final int reconnectDelay;
-
-    private final int retries;
-
     private final FlumeEventFactory factory;
 
     private enum ManagerType {
@@ -63,7 +59,7 @@ public final class FlumeAppender extends
 
     private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException,
                           final String includes, final String excludes, final String required, final String mdcPrefix,
-                          final String eventPrefix, final boolean compress, final int delay, final int retries,
+                          final String eventPrefix, final boolean compress,
                           final FlumeEventFactory factory, final AbstractFlumeManager manager) {
         super(name, filter, layout, handleException);
         this.manager = manager;
@@ -73,8 +69,6 @@ public final class FlumeAppender extends
         this.eventPrefix = eventPrefix;
         this.mdcPrefix = mdcPrefix;
         this.compressBody = compress;
-        this.reconnectDelay = delay;
-        this.retries = retries;
         this.factory = factory == null ? this : factory;
     }
 
@@ -87,7 +81,7 @@ public final class FlumeAppender extends
         final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
             eventPrefix, compressBody);
         flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
-        manager.send(flumeEvent, reconnectDelay, retries);
+        manager.send(flumeEvent);
     }
 
     @Override
@@ -122,8 +116,11 @@ public final class FlumeAppender extends
      * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
      * @param type Avro (default), Embedded, or Persistent.
      * @param dataDir The directory where the Flume FileChannel should write its data.
-     * @param delay The amount of time in milliseconds to wait between retries.
+     * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is
+     *                          1000.
+     * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
      * @param agentRetries The number of times to retry an agent before failing to the next agent.
+     * @param maxDelay The maximum number of seconds to wait for a complete batch.
      * @param name The name of the Appender.
      * @param suppress If true exceptions will be handled in the appender.
      * @param excludes A comma separated list of MDC elements to exclude.
@@ -144,8 +141,10 @@ public final class FlumeAppender extends
                                                    @PluginAttr("embedded") final String embedded,
                                                    @PluginAttr("type") final String type,
                                                    @PluginAttr("dataDir") final String dataDir,
-                                                   @PluginAttr("reconnectionDelay") final String delay,
+                                                   @PluginAttr("connectTimeout") final String connectionTimeout,
+                                                   @PluginAttr("requestTimeout") final String requestTimeout,
                                                    @PluginAttr("agentRetries") final String agentRetries,
+                                                   @PluginAttr("maxDelay") final String maxDelay,
                                                    @PluginAttr("name") final String name,
                                                    @PluginAttr("suppressExceptions") final String suppress,
                                                    @PluginAttr("mdcExcludes") final String excludes,
@@ -165,7 +164,7 @@ public final class FlumeAppender extends
         final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
         ManagerType managerType;
         if (type != null) {
-            if (embed) {
+            if (embed && embedded != null) {
                 try {
                     managerType = ManagerType.getType(type);
                     LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
@@ -188,8 +187,11 @@ public final class FlumeAppender extends
         }
 
         final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
-        final int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
+        final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout);
+        final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout);
         final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
+        final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
+
 
         if (layout == null) {
             layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes,
@@ -212,14 +214,15 @@ public final class FlumeAppender extends
                     LOGGER.debug("No agents provided, using defaults");
                     agents = new Agent[] {Agent.createAgent(null, null)};
                 }
-                manager = FlumeAvroManager.getManager(name, agents, batchCount);
+                manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
                 break;
             case PERSISTENT:
                 if (agents == null || agents.length == 0) {
                     LOGGER.debug("No agents provided, using defaults");
                     agents = new Agent[] {Agent.createAgent(null, null)};
                 }
-                manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, reconnectDelay, dataDir);
+                manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
+                    connectTimeout, reqTimeout, delay, dataDir);
                 break;
             default:
                 LOGGER.debug("No manager type specified. Defaulting to AVRO");
@@ -227,7 +230,7 @@ public final class FlumeAppender extends
                     LOGGER.debug("No agents provided, using defaults");
                     agents = new Agent[] {Agent.createAgent(null, null)};
                 }
-                manager = FlumeAvroManager.getManager(name, agents, batchCount);
+                manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
         }
 
         if (manager == null) {
@@ -235,6 +238,6 @@ public final class FlumeAppender extends
         }
 
         return new FlumeAppender(name, filter, layout,  handleExceptions, includes,
-            excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
+            excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
     }
 }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java Tue Apr 16 06:03:44 2013
@@ -16,62 +16,56 @@
  */
 package org.apache.logging.log4j.flume.appender;
 
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.AvroSourceProtocol;
-import org.apache.flume.source.avro.Status;
+import org.apache.flume.Event;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
 import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
 import org.apache.logging.log4j.core.appender.ManagerFactory;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Properties;
 
 /**
  * Manager for FlumeAvroAppenders.
  */
 public class FlumeAvroManager extends AbstractFlumeManager {
 
-    /**
-      The default reconnection delay (500 milliseconds or .5 seconds).
-     */
-    public static final int DEFAULT_RECONNECTION_DELAY   = 500;
-
-    private static final int DEFAULT_RECONNECTS = 3;
+    private static final int MAX_RECONNECTS = 3;
 
     private static ManagerFactory factory = new AvroManagerFactory();
 
-    private AvroSourceProtocol client;
-
     private final Agent[] agents;
 
     private final int batchSize;
 
-    private final EventList events = new EventList();
+    private final int retries;
+
+    private final int connectTimeout;
+
+    private final int requestTimeout;
 
     private int current = 0;
 
-    private Transceiver transceiver;
+    private RpcClient rpcClient = null;
 
     /**
      * Constructor
      * @param name The unique name of this manager.
      * @param agents An array of Agents.
      * @param batchSize The number of events to include in a batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectTimeout The connection timeout in ms.
+     * @param requestTimeout The request timeout in ms.
+     *
      */
-    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize) {
+    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
+                               final int retries, final int connectTimeout, final int requestTimeout) {
         super(name);
         this.agents = agents;
         this.batchSize = batchSize;
-        this.client = connect(agents);
+        this.retries = retries;
+        this.connectTimeout = connectTimeout;
+        this.requestTimeout = requestTimeout;
+        this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
     }
 
     /**
@@ -81,7 +75,8 @@ public class FlumeAvroManager extends Ab
      * @param batchSize The number of events to include in a batch.
      * @return A FlumeAvroManager.
      */
-    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize) {
+    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
+                                              final int retries, final int connectTimeout, final int requestTimeout) {
         if (agents == null || agents.length == 0) {
             throw new IllegalArgumentException("At least one agent is required");
         }
@@ -100,7 +95,8 @@ public class FlumeAvroManager extends Ab
             first = false;
         }
         sb.append("]");
-        return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize));
+        return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
+            connectTimeout, requestTimeout));
     }
 
     /**
@@ -119,160 +115,54 @@ public class FlumeAvroManager extends Ab
         return current;
     }
 
+    public int getRetries() {
+        return retries;
+    }
+
+    public int getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public int getRequestTimeout() {
+        return requestTimeout;
+    }
+
     public synchronized void send(final BatchEvent events) {
-        if (client == null) {
-            client = connect(agents);
+        if (rpcClient == null) {
+            rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
         }
 
-        if (client != null) {
-            final List<SimpleEvent> list = events.getEvents();
-            final List<AvroFlumeEvent> batch = new ArrayList<AvroFlumeEvent>(list.size());
-            for (SimpleEvent event : list) {
-                final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-                avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
-                avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
-
-                for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
-                    avroEvent.getHeaders().put(entry.getKey(), entry.getValue());
-                }
-                batch.add(avroEvent);
-            }
-
+        if (rpcClient != null) {
             try {
-                final Status status = client.appendBatch(batch);
-                if (status.equals(Status.OK)) {
-                    return;
-                } else {
-                    LOGGER.warn("RPC communication failed to " + agents[current].getHost() +
-                        ":" + agents[current].getPort());
-                }
+                rpcClient.appendBatch(events.getEvents());
             } catch (final Exception ex) {
+                rpcClient.close();
+                rpcClient = null;
                 String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
                     agents[current].getPort();
                 LOGGER.warn(msg, ex);
-            }
-
-            for (int index = 0; index < agents.length; ++index) {
-                if (index == current) {
-                    continue;
-                }
-                final Agent agent = agents[index];
-                try {
-                    transceiver = null;
-                    final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
-                    final Status status = c.appendBatch(batch);
-                    if (!status.equals(Status.OK)) {
-                        final String warnMsg = "RPC communication failed to " + getName() + " at " +
-                            agent.getHost() + ":" + agent.getPort();
-                        LOGGER.warn(warnMsg);
-                        continue;
-                    }
-                    client = c;
-                    current = index;
-                    return;
-                } catch (final Exception ex) {
-                    final String warnMsg = "Unable to write to " + getName() + " at " + agent.getHost() + ":" +
-                        agent.getPort();
-                    LOGGER.warn(warnMsg, ex);
-                }
+                throw new AppenderRuntimeException("No Flume agents are available");
             }
         }
-        throw new AppenderRuntimeException("No Flume agents are available");
     }
 
     @Override
-    public synchronized void send(final SimpleEvent event, int delay, int retries)  {
-        if (delay == 0) {
-            delay = DEFAULT_RECONNECTION_DELAY;
-        }
-        if (retries == 0) {
-            retries = DEFAULT_RECONNECTS;
-        }
-        if (client == null) {
-            client = connect(agents);
-        }
-        String msg = "No Flume agents are available";
-        if (client != null) {
-            final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-            avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
-            avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
-
-            for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
-                avroEvent.getHeaders().put(entry.getKey(), entry.getValue());
-            }
-
-            final List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
-            if (batch == null && batchSize > 1) {
-                return;
-            }
-
-            int i = 0;
-
-            msg = "Error writing to " + getName();
-
-            do {
-                try {
-                    final Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
-                    if (!status.equals(Status.OK)) {
-                        throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
-                            ":" + agents[current].getPort());
-                    }
-                    return;
-                } catch (final Exception ex) {
-                    if (i == retries - 1) {
-                        msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
-                            agents[current].getPort();
-                        LOGGER.warn(msg, ex);
-                        break;
-                    }
-                    sleep(delay);
-                }
-            } while (++i < retries);
-
-            for (int index = 0; index < agents.length; ++index) {
-                if (index == current) {
-                    continue;
-                }
-                final Agent agent = agents[index];
-                i = 0;
-                do {
-                    try {
-                        transceiver = null;
-                        final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
-                        final Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
-                        if (!status.equals(Status.OK)) {
-                            if (i == retries - 1) {
-                                final String warnMsg = "RPC communication failed to " + getName() + " at " +
-                                    agent.getHost() + ":" + agent.getPort();
-                                LOGGER.warn(warnMsg);
-                            }
-                            continue;
-                        }
-                        client = c;
-                        current = index;
-                        return;
-                    } catch (final Exception ex) {
-                        if (i == retries - 1) {
-                            final String warnMsg = "Unable to write to " + getName() + " at " + agent.getHost() + ":" +
-                                agent.getPort();
-                            LOGGER.warn(warnMsg, ex);
-                            break;
-                        }
-                        sleep(delay);
-                    }
-                } while (++i < retries);
-            }
+    public synchronized void send(final Event event)  {
+        if (rpcClient == null) {
+            rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
         }
 
-        throw new AppenderRuntimeException(msg);
-
-    }
-
-    private void sleep(final int delay) {
-        try {
-            Thread.sleep(delay);
-        } catch (final InterruptedException ex) {
-            Thread.currentThread().interrupt();
+        if (rpcClient != null) {
+            try {
+                rpcClient.append(event);
+            } catch (final Exception ex) {
+                rpcClient.close();
+                rpcClient = null;
+                String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
+                    agents[current].getPort();
+                LOGGER.warn(msg, ex);
+                throw new AppenderRuntimeException("No Flume agents are available");
+            }
         }
     }
 
@@ -281,70 +171,51 @@ public class FlumeAvroManager extends Ab
      * @param agents The list of agents to choose from
      * @return The FlumeEventAvroServer.
      */
-    private AvroSourceProtocol connect(final Agent[] agents) {
-        int i = 0;
-        for (final Agent agent : agents) {
-            final AvroSourceProtocol server = connect(agent.getHost(), agent.getPort());
-            if (server != null) {
-                current = i;
-                return server;
+
+    private RpcClient connect(final Agent[] agents, int retries, int connectTimeout, int requestTimeout) {
+        Properties props = new Properties();
+
+        props.put("client.type", agents.length > 1 ? "default_failover" : "default");
+
+        int count = 1;
+        StringBuilder sb = new StringBuilder();
+        for (Agent agent : agents) {
+            if (sb.length() > 0) {
+                sb.append(" ");
             }
-            ++i;
+            String hostName = "host" + count++;
+            props.put("hosts." + hostName, agent.getHost() + ":" + agent.getPort());
+            sb.append(hostName);
         }
-        LOGGER.error("Flume manager " + getName() + " was unable to connect to any agents");
-        return null;
-    }
-
-    private AvroSourceProtocol connect(final String hostname, final int port) {
-        try {
-            if (transceiver == null) {
-                transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
+        props.put("hosts", sb.toString());
+        if (batchSize > 0) {
+            props.put("batch-size", Integer.toString(batchSize));
+        }
+        if (retries > 1) {
+            if (retries > MAX_RECONNECTS) {
+                retries = MAX_RECONNECTS;
             }
-        } catch (final IOException ioe) {
-            LOGGER.error("Unable to create transceiver", ioe);
-            return null;
+            props.put("max-attempts", Integer.toString(retries * agents.length));
         }
-        try {
-            return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver);
-        } catch (final IOException ioe) {
-            LOGGER.error("Unable to create Avro client");
-            return null;
+        if (requestTimeout >= 1000) {
+            props.put("request-timeout", Integer.toString(requestTimeout));
+        }
+        if (connectTimeout >= 1000) {
+            props.put("connect-timeout", Integer.toString(connectTimeout));
         }
+        return RpcClientFactory.getInstance(props);
     }
 
     @Override
     protected void releaseSub() {
-        if (transceiver != null) {
+        if (rpcClient != null) {
             try {
-                transceiver.close();
-            } catch (final IOException ioe) {
-                LOGGER.error("Attempt to clean up Avro transceiver failed", ioe);
-            }
-        }
-        client = null;
-    }
-
-    /**
-     * Thread-safe List management of a batch.
-     */
-    private static class EventList extends ArrayList<AvroFlumeEvent> {
-
-        /**
-         * Generated serial version ID.
-         */
-        private static final long serialVersionUID = -1599817377315957495L;
-
-        public synchronized List<AvroFlumeEvent> addAndGet(final AvroFlumeEvent event, final int batchSize) {
-            super.add(event);
-            if (this.size() >= batchSize) {
-                final List<AvroFlumeEvent> events = new ArrayList<AvroFlumeEvent>();
-                events.addAll(this);
-                clear();
-                return events;
-            } else {
-                return null;
+                rpcClient.close();
+            } catch (final Exception ex) {
+                LOGGER.error("Attempt to close RPC client failed", ex);
             }
         }
+        rpcClient = null;
     }
 
     /**
@@ -354,6 +225,9 @@ public class FlumeAvroManager extends Ab
         private final String name;
         private final Agent[] agents;
         private final int batchSize;
+        private final int retries;
+        private final int conntectTimeout;
+        private final int requestTimeout;
 
         /**
          * Constructor.
@@ -361,10 +235,14 @@ public class FlumeAvroManager extends Ab
          * @param agents The agents.
          * @param batchSize The number of events to include in a batch.
          */
-        public FactoryData(final String name, final Agent[] agents, final int batchSize) {
+        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
+                           final int connectTimeout, final int requestTimeout) {
             this.name = name;
             this.agents = agents;
             this.batchSize = batchSize;
+            this.retries = retries;
+            this.conntectTimeout = connectTimeout;
+            this.requestTimeout = requestTimeout;
         }
     }
 
@@ -382,7 +260,8 @@ public class FlumeAvroManager extends Ab
         public FlumeAvroManager createManager(final String name, final FactoryData data) {
             try {
 
-                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize);
+                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
+                    data.conntectTimeout, data.requestTimeout);
             } catch (final Exception ex) {
                 LOGGER.error("Could not create FlumeAvroManager", ex);
             }

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java Tue Apr 16 06:03:44 2013
@@ -16,8 +16,8 @@
  */
 package org.apache.logging.log4j.flume.appender;
 
+import org.apache.flume.Event;
 import org.apache.flume.SourceRunner;
-import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.node.NodeConfiguration;
 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
 import org.apache.logging.log4j.core.appender.ManagerFactory;
@@ -119,7 +119,7 @@ public class FlumeEmbeddedManager extend
     }
 
     @Override
-    public void send(final SimpleEvent event, final int delay, final int retries) {
+    public void send(final Event event) {
         source.send(event);
     }
 

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java Tue Apr 16 06:03:44 2013
@@ -25,6 +25,7 @@ import com.sleepycat.je.EnvironmentConfi
 import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.StatsConfig;
+import org.apache.flume.Event;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.logging.log4j.LoggingException;
 import org.apache.logging.log4j.core.appender.ManagerFactory;
@@ -66,16 +67,11 @@ public class FlumePersistentManager exte
 
     private final WriterThread worker;
 
-    private final int reconnectionDelay;
-
     private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
 
     private final SecretKey secretKey;
 
-    /**
-     The default reconnection delay (5 minutes).
-     */
-    public static final int DEFAULT_DELAY = 1000 * 60 * 5;
+    private final int delay;
 
     /**
      * Constructor
@@ -85,13 +81,14 @@ public class FlumePersistentManager exte
      * @param database The database to write to.
      */
     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
-                                     final int batchSize, final int reconnectionDelay, final Database database,
+                                     final int batchSize, final int retries, final int connectionTimeout,
+                                     final int requestTimeout, final int delay, final Database database,
                                      SecretKey secretKey) {
-        super(name, shortName, agents, batchSize);
+        super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
+        this.delay = delay;
         this.database = database;
         this.worker = new WriterThread(database, this, queue, batchSize, secretKey);
         this.worker.start();
-        this.reconnectionDelay = reconnectionDelay <= 0 ? DEFAULT_DELAY : reconnectionDelay;
         this.secretKey = secretKey;
     }
 
@@ -104,7 +101,8 @@ public class FlumePersistentManager exte
      * @return A FlumeAvroManager.
      */
     public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties,
-                                                    int batchSize, final int reconnectionDelay, final String dataDir) {
+                                                    int batchSize, final int retries, final int connectionTimeout,
+                                                    final int requestTimeout, final int delay, final String dataDir) {
         if (agents == null || agents.length == 0) {
             throw new IllegalArgumentException("At least one agent is required");
         }
@@ -125,12 +123,12 @@ public class FlumePersistentManager exte
         }
         sb.append("]");
         sb.append(" ").append(dataDirectory);
-        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize,
-            reconnectionDelay, dataDir, properties));
+        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
+            connectionTimeout, requestTimeout, delay, dataDir, properties));
     }
 
     @Override
-    public synchronized void send(final SimpleEvent event, int delay, int retries)  {
+    public synchronized void send(final Event event)  {
         if (worker.isShutdown()) {
             throw new LoggingException("Unable to record event");
         }
@@ -181,7 +179,7 @@ public class FlumePersistentManager exte
 
     private void doSend(final SimpleEvent event) {
         LOGGER.debug("Sending event to Flume");
-        super.send(event, 1, 1);
+        super.send(event);
     }
 
     /**
@@ -192,7 +190,10 @@ public class FlumePersistentManager exte
         private final Agent[] agents;
         private final int batchSize;
         private final String dataDir;
-        private final int reconnectionDelay;
+        private final int retries;
+        private final int connectionTimeout;
+        private final int requestTimeout;
+        private final int delay;
         private final Property[] properties;
 
         /**
@@ -202,13 +203,17 @@ public class FlumePersistentManager exte
          * @param batchSize The number of events to include in a batch.
          * @param dataDir The directory for data.
          */
-        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int reconnectionDelay,
+        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
+                           final int connectionTimeout, final int requestTimeout, final int delay,
                            final String dataDir, final Property[] properties) {
             this.name = name;
             this.agents = agents;
             this.batchSize = batchSize;
             this.dataDir = dataDir;
-            this.reconnectionDelay = reconnectionDelay;
+            this.retries = retries;
+            this.connectionTimeout = connectionTimeout;
+            this.requestTimeout = requestTimeout;
+            this.delay = delay;
             this.properties = properties;
         }
     }
@@ -290,8 +295,8 @@ public class FlumePersistentManager exte
             } catch (Exception ex) {
                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
             }
-            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.reconnectionDelay,
-                database, secretKey);
+            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
+                data.connectionTimeout, data.requestTimeout, data.delay, database, secretKey);
         }
     }
 
@@ -329,7 +334,7 @@ public class FlumePersistentManager exte
             long lastBatch = System.currentTimeMillis();
             while (!shutdown) {
                 if (database.count() >= batchSize ||
-                    database.count() > 0 && lastBatch + manager.reconnectionDelay > System.currentTimeMillis()) {
+                    database.count() > 0 && lastBatch + manager.delay > System.currentTimeMillis()) {
                     lastBatch = System.currentTimeMillis();
                     try {
                         boolean errors = false;
@@ -356,7 +361,7 @@ public class FlumePersistentManager exte
                                         LOGGER.error("Error sending events", ioe);
                                         break;
                                     }
-                                    for (SimpleEvent event : batch.getEvents()) {
+                                    for (Event event : batch.getEvents()) {
                                         try {
                                             Map<String, String> headers = event.getHeaders();
                                             key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
@@ -397,7 +402,7 @@ public class FlumePersistentManager exte
                             cursor.close();
                         }
                         if (errors) {
-                            Thread.sleep(manager.reconnectionDelay);
+                            Thread.sleep(manager.delay);
                             continue;
                         }
                     } catch (Exception ex) {
@@ -408,7 +413,7 @@ public class FlumePersistentManager exte
                         if (database.count() >= batchSize) {
                             continue;
                         }
-                        queue.poll(manager.reconnectionDelay, TimeUnit.MILLISECONDS);
+                        queue.poll(manager.delay, TimeUnit.MILLISECONDS);
                         LOGGER.debug("WriterThread notified of work");
                     } catch (InterruptedException ie) {
                         LOGGER.warn("WriterThread interrupted, continuing");

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java Tue Apr 16 06:03:44 2013
@@ -17,8 +17,8 @@
 package org.apache.logging.log4j.flume.appender;
 
 import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
-import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.AbstractSource;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ public class Log4jEventSource extends Ab
     }
 
 
-    public void send(final SimpleEvent event) {
+    public void send(final Event event) {
         sourceCounter.incrementAppendReceivedCount();
         sourceCounter.incrementEventReceivedCount();
         try {

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Tue Apr 16 06:03:44 2013
@@ -129,8 +129,8 @@ public class FlumeAppenderTest {
     @Test
     public void testLog4jAvroAppender() throws InterruptedException, IOException {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
-            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000",
+            "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -155,8 +155,8 @@ public class FlumeAppenderTest {
     @Test
     public void testStructured() throws InterruptedException, IOException {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
-            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000",
+            "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         final Logger eventLogger = (Logger) LogManager.getLogger("EventLogger");
         Assert.assertNotNull(eventLogger);
@@ -193,8 +193,8 @@ public class FlumeAppenderTest {
     @Test
     public void testMultiple() throws InterruptedException, IOException {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
-            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000",
+            "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -223,8 +223,8 @@ public class FlumeAppenderTest {
      @Test
     public void testBatch() throws InterruptedException, IOException {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
-            "avro", "false", null, null, null, null, null, "true", "10", null, null, null);
+         final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000",
+             "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "10", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -254,8 +254,8 @@ public class FlumeAppenderTest {
     @Test
     public void testConnectionRefused() {
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
-            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000",
+            "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);
@@ -280,8 +280,8 @@ public class FlumeAppenderTest {
         final String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
             Agent.createAgent("localhost", altPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "100", "3",
-            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000",
+            "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         Assert.assertTrue("Appender Not started", avroAppender.isStarted());
         avroLogger.addAppender(avroAppender);
@@ -325,8 +325,8 @@ public class FlumeAppenderTest {
         final String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
         final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
                                       Agent.createAgent("localhost", altPort)};
-        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, null, "Avro", null, "100", "3",
-            "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", "Avro", null, "1000",
+            "1000", "1", "1000", "avro", "false", null, null, null, null, null, "true", "1", null, null, null);
         avroAppender.start();
         avroLogger.addAppender(avroAppender);
         avroLogger.setLevel(Level.ALL);

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/default_embedded.xml Tue Apr 16 06:03:44 2013
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration status="warn" name="MyApp" packages="">
   <appenders>
-    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true" dataDir="InMemory">
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" type="embedded" dataDir="InMemory">
       <Agent host="localhost" port="${sys:primaryPort}"/>
       <Agent host="localhost" port="${sys:alternatePort}"/>
       <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/resources/embedded.xml Tue Apr 16 06:03:44 2013
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration status="warn" name="MyApp" packages="">
   <appenders>
-    <Flume name="eventLogger" suppressExceptions="false" compress="true" embedded="true">
+    <Flume name="eventLogger" suppressExceptions="false" compress="true" type="embedded">
       <Property name="channels">primary</Property>
       <Property name="channels.primary.type">memory</Property>
       <Property name="sinks">agent1 agent2</Property>

Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Tue Apr 16 06:03:44 2013
@@ -23,6 +23,12 @@
 
   <body>
     <release version="2.0-beta5" date="@TBD@" description="Bug fixes and enhancements">
+      <action issue="LOG4J2-198" dev="rgoers" type="fix">
+        FlumeAvroManager now uses Flume RPCClient.
+      </action>
+      <action issue="LOG4J2-196" dev="rgoers" type="fix">
+        FlumeAvroManager now uses Flume RPCClient.
+      </action>
       <action issue="LOG4J2-207" dev="ggregory" type="fix">
         Use the Maven group ID org.apache.logging.log4j for all artifacts.
       </action>

Modified: logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml?rev=1468306&r1=1468305&r2=1468306&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml (original)
+++ logging/log4j/log4j2/trunk/src/site/xdoc/manual/appenders.xml Tue Apr 16 06:03:44 2013
@@ -100,7 +100,7 @@
             <tr>
               <td>includeLocation</td>
               <td>boolean</td>
-              <td>Extracting location is an expensive operation (it can make  
+              <td>Extracting location is an expensive operation (it can make
               logging 5 - 20 times slower). To improve performance, location is
               not included by default when adding a log event to the queue.
               You can change this by setting includeLocation="true".</td>
@@ -388,7 +388,7 @@
             </ol>
             Usage as an embedded agent will cause the messages to be directly passed to the Flume Channel and then
             control will be immediately returned to the application. All interaction with remote agents will occur
-            asynchronously. Setting the "embedded" attribute to "true" will force the use of the embedded agent. In
+            asynchronously. Setting the "type" attribute to "Embedded" will force the use of the embedded agent. In
             addition, configuring agent properties in the appender configuration will also cause the embedded agent
             to be used.
           </p>
@@ -425,6 +425,11 @@
               <td>When set to true the message body will be compressed using gzip</td>
             </tr>
             <tr>
+              <td>connectTimeout</td>
+              <td>integer</td>
+              <td>The number of milliseconds Flume will wait before timing out the connection.</td>
+            </tr>
+            <tr>
               <td>dataDir</td>
               <td>String</td>
               <td>Directory where the Flume write ahead log should be written. Valid only when embedded is set
@@ -454,6 +459,11 @@
               <td>The Layout to use to format the LogEvent. If no layout is specified RFC5424Layout will be used.</td>
             </tr>
             <tr>
+              <td>maxDelay</td>
+              <td>integer</td>
+              <td>The maximum number of seconds to wait for batchSize events before publishing the batch.</td>
+            </tr>
+            <tr>
               <td>mdcExcludes</td>
               <td>String</td>
               <td>A comma separated list of mdc keys that should be excluded from the FlumeEvent. This is mutually
@@ -497,12 +507,9 @@
               </td>
             </tr>
             <tr>
-              <td>reconnectionDelay</td>
+              <td>requestTimeout</td>
               <td>integer</td>
-              <td>The number of milliseconds the application should wait before trying again to connect to the
-                agent. When type="persistent" is specified this represents the frequency between retries when no
-                remote Flume agents are available (the persistent agent will continue to write to its database in
-                the interim).</td>
+              <td>The number of milliseconds Flume will wait before timing out the request.</td>
             </tr>
 
             <tr>



Mime
View raw message