flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1306687 - in /incubator/flume/trunk: flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/ flume-ng-core/src/main/java/org/apache/flume/client/avro/ flume-ng-core/src/main/java/org/apache/flume/sink/...
Date Thu, 29 Mar 2012 01:58:53 GMT
Author: arvind
Date: Thu Mar 29 01:58:53 2012
New Revision: 1306687

URL: http://svn.apache.org/viewvc?rev=1306687&view=rev
Log:
FLUME-962. Failover capability for Client SDK.

(Hari Shreedharan via Arvind Prabhakar)

Added:
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java   (with props)
Modified:
    incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java

Modified: incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java?rev=1306687&r1=1306686&r2=1306687&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java (original)
+++ incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java Thu Mar 29 01:58:53 2012
@@ -181,7 +181,7 @@ public class Log4jAppender extends Appen
   @Override
   public void activateOptions() throws FlumeException{
     try {
-      rpcClient = RpcClientFactory.getInstance(hostname, port);
+      rpcClient = RpcClientFactory.getDefaultInstance(hostname, port);
     } catch (FlumeException e) {
       String errormsg = "RPC client creation failed! " +
           e.getMessage();

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java?rev=1306687&r1=1306686&r2=1306687&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java Thu Mar 29 01:58:53 2012
@@ -117,7 +117,7 @@ public class AvroCLIClient {
 
     BufferedReader reader = null;
 
-    RpcClient rpcClient = RpcClientFactory.getInstance(hostname, port, BATCH_SIZE);
+    RpcClient rpcClient = RpcClientFactory.getDefaultInstance(hostname, port, BATCH_SIZE);
     try {
       List<Event> eventBuffer = Lists.newArrayList();
 

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1306687&r1=1306686&r2=1306687&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Thu Mar 29 01:58:53 2012
@@ -146,7 +146,7 @@ public class AvroSink extends AbstractSi
           "Building RpcClient with hostname:{}, port:{}, batchSize:{}",
           new Object[] { hostname, port, batchSize });
 
-       client = RpcClientFactory.getInstance(hostname, port, batchSize);
+       client = RpcClientFactory.getDefaultInstance(hostname, port, batchSize);
     }
 
   }

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java?rev=1306687&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java Thu Mar 29 01:58:53 2012
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+
+public abstract class AbstractRpcClient implements RpcClient {
+
+  protected Integer batchSize;
+  public static final String HOSTS_PREFIX = "hosts.";
+  public static final String CONFIG_HOSTS = "hosts";
+  public final static Integer DEFAULT_BATCH_SIZE = 100;
+  @Override
+  public int getBatchSize(){
+    return batchSize;
+  }
+  @Override
+  public abstract void append(Event event) throws EventDeliveryException;
+
+  @Override
+  public abstract void appendBatch(List<Event> events)
+      throws EventDeliveryException;
+
+  @Override
+  public abstract boolean isActive();
+
+  @Override
+  public abstract void close() throws FlumeException;
+
+
+  /**
+   * Configure the client using the given properties object.
+   * @param properties
+   * @throws FlumeException if the client can not be configured using this
+   * method, or if the client was already configured once.
+   */
+  protected abstract void configure(Properties properties)
+      throws FlumeException;
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java?rev=1306687&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java Thu Mar 29 01:58:53 2012
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.flume.api;
+
+import java.net.InetSocketAddress;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Avro/Netty implementation of {@link RpcClient} which supports failover. This
+ * takes a list of hostname port combinations and connects to the next available
+ * (looping back to the first) host, from a given list of agents in the order
+ * provided.
+ *
+ *
+ * The properties used to build a FailoverRpcClient must have:
+ * <p><tt>hosts</tt> = <i>alias_for_host1</i> <i>alias_for_host2</i></p> ...
+ * <p><tt>hosts.alias_for_host1</tt> = <i>hostname1:port1</i>. </p>
+ * <p><tt>hosts.alias_for_host2</tt> = <i>hostname2:port2</i>. </p> etc
+ * <p>Optionally it can also have a <p>
+ * <tt>batch-size</tt> = <i>batchSize</i>
+ * <tt>max-attempts</tt> = <i>maxAttempts</i>
+ *
+ * Given a failure, this client will attempt to append to <i>maxAttempts</i>
+ * clients in the <i>hosts</i> list immediately following the failed host
+ * (looping back to the beginning of the <i>hosts</i> list.
+ */
+
+public class FailoverRpcClient extends AbstractRpcClient implements RpcClient {
+  private volatile RpcClient client;
+  private List<InetSocketAddress> hosts;
+  private Integer maxTries;
+  private int lastCheckedhost;
+  private boolean isActive;
+  private static final String CONFIG_MAX_ATTEMPTS = "max-attempts";
+  private static final Logger logger = LoggerFactory
+      .getLogger(FailoverRpcClient.class);
+
+  protected FailoverRpcClient() {
+    lastCheckedhost = -1;
+    client = null;
+  }
+
+  //This function has to be synchronized to establish a happens-before
+  //relationship for different threads that access this object
+  //since shared data structures are created here.
+  private synchronized void configureHosts(Properties properties)
+      throws FlumeException {
+    if(isActive){
+      logger.error("This client was already configured, " +
+          "cannot reconfigure.");
+      throw new FlumeException("This client was already configured, " +
+          "cannot reconfigure.");
+    }
+    hosts = new ArrayList<InetSocketAddress>();
+    String hostNames = properties.getProperty(CONFIG_HOSTS);
+    String[] hostList;
+    if (hostNames != null && !hostNames.isEmpty()) {
+      hostList = hostNames.split("\\s+");
+      for (int i = 0; i < hostList.length; i++) {
+        String hostAndPortStr = properties.getProperty(
+            HOSTS_PREFIX + hostList[i]);
+        // Ignore that host if value is not there
+        if (hostAndPortStr != null) {
+          String[] hostAndPort = hostAndPortStr.split(":");
+          if (hostAndPort.length != 2){
+            logger.error("Invalid host address" + hostAndPortStr);
+            throw new FlumeException("Invalid host address" + hostAndPortStr);
+          }
+          Integer port = null;
+          try {
+            port = Integer.parseInt(hostAndPort[1]);
+          } catch (NumberFormatException e) {
+            logger.error("Invalid port number" + hostAndPortStr, e);
+            throw new FlumeException("Invalid port number" + hostAndPortStr);
+          }
+          hosts.add(new InetSocketAddress(hostAndPort[0].trim(), port));
+        }
+      }
+    }
+    String tries = properties.getProperty(CONFIG_MAX_ATTEMPTS);
+    if (tries == null || tries.isEmpty()){
+      maxTries = hosts.size();
+    } else {
+      try {
+        maxTries = Integer.parseInt(tries);
+      } catch (NumberFormatException e) {
+        maxTries = hosts.size();
+      }
+    }
+    Integer batchSize;
+    try {
+      batchSize = Integer.parseInt(properties.getProperty("batch-size"));
+      if (batchSize == null){
+        logger.warn("No batch size found - assigning default size");
+        batchSize = DEFAULT_BATCH_SIZE;
+      }
+    } catch (NumberFormatException e) {
+      logger.warn("Batch Size {} is invalid - assigning default size",
+          properties.getProperty("batch-size"), e);
+      batchSize = DEFAULT_BATCH_SIZE;
+    }
+    isActive = true;
+  }
+
+  /**
+   * Get the maximum number of "failed" hosts the client will try to establish
+   * connection to before throwing an exception. Failed = was able to set up a
+   * connection, but failed / returned error when the client tried to send data,
+   *
+   * @return The maximum number of failed retries
+   */
+  protected Integer getMaxTries() {
+    return maxTries;
+  }
+
+  private synchronized RpcClient getClient() {
+    if (client == null || !this.client.isActive()) {
+      client = getNextClient();
+      return client;
+    } else {
+      return client;
+    }
+  }
+
+  /**
+   * Tries to append an event to the currently connected client. If it cannot
+   * send the event, it tries to send to next available host
+   *
+   * @param event The event to be appended.
+   *
+   * @throws EventDeliveryException
+   */
+  @Override
+  public void append(Event event) throws EventDeliveryException {
+    //Why a local variable rather than just calling getClient()?
+    //If we get an EventDeliveryException, we need to call close on
+    //that specific client, getClient in this case, will get us
+    //the next client - leaving a resource leak.
+    RpcClient localClient = null;
+    synchronized (this) {
+      if (!isActive) {
+        logger.error("Attempting to append to an already closed client.");
+        throw new EventDeliveryException(
+            "Attempting to append to an already closed client.");
+      }
+    }
+    // Sit in an infinite loop and try to append!
+    int tries = 0;
+    while (tries < maxTries) {
+      try {
+        tries++;
+        localClient = getClient();
+        localClient.append(event);
+        return;
+      } catch (EventDeliveryException e) {
+        // Could not send event through this client, try to pick another client.
+        logger.warn("Client failed. Exception follows: ", e);
+        localClient.close();
+        localClient = null;
+      } catch (Exception e2) {
+        logger.error("Failed to send event: ", e2);
+        throw new EventDeliveryException(
+            "Failed to send event. Exception follows: ", e2);
+      }
+    }
+    logger.error("Tried many times, could not send event.");
+    throw new EventDeliveryException("Failed to send the event!");
+  }
+
+  /**
+   * Tries to append a list of events to the currently connected client. If it
+   * cannot send the event, it tries to send to next available host
+   *
+   * @param events The events to be appended.
+   *
+   * @throws EventDeliveryException
+   */
+  @Override
+  public void appendBatch(List<Event> events)
+      throws EventDeliveryException {
+    RpcClient localClient = null;
+    synchronized (this) {
+      if (!isActive) {
+        logger.error("Attempting to append to an already closed client.");
+        throw new EventDeliveryException(
+            "Attempting to append to an already closed client!");
+      }
+    }
+    int tries = 0;
+    while (tries < maxTries) {
+      try {
+        tries++;
+        localClient = getClient();
+        localClient.appendBatch(events);
+        return;
+      } catch (EventDeliveryException e) {
+        // Could not send event through this client, try to pick another client.
+        logger.warn("Client failed. Exception follows: ", e);
+        localClient.close();
+        localClient = null;
+      } catch (Exception e1) {
+        logger.error("No clients active: ", e1);
+        throw new EventDeliveryException("No clients currently active. " +
+            "Exception follows: ", e1);
+      }
+    }
+    logger.error("Tried many times, could not send event.");
+    throw new EventDeliveryException("Failed to send the event!");
+  }
+
+  // Returns false if and only if this client has been closed explicitly.
+  // Should we check if any clients are active, if none are then return false?
+  // This method has to be lightweight, so not checking if hosts are active.
+  @Override
+  public synchronized boolean isActive() {
+    return isActive;
+  }
+
+  /**
+   * Close the connection. This function is safe to call over and over.
+   */
+  @Override
+  public synchronized void close() throws FlumeException {
+    if (client != null) {
+      client.close();
+      isActive = false;
+    }
+  }
+
+  /**
+   * Get the last socket address this client connected to. No guarantee this
+   * will be the next it will connect to. If this host is down, it will connect
+   * to another host. To be used only from the unit tests!
+   * @return The last socket address this client connected to
+   */
+  protected InetSocketAddress getLastConnectedServerAddress() {
+    return hosts.get(lastCheckedhost);
+  }
+
+  private RpcClient getNextClient() throws FlumeException {
+    lastCheckedhost =
+        (lastCheckedhost == (hosts.size() - 1)) ? -1 : lastCheckedhost;
+    RpcClient localClient = null;
+    int limit = hosts.size();
+    //Try to connect to all hosts again, till we find one available
+    for (int count = lastCheckedhost + 1; count < limit; count++) {
+      try {
+        localClient =
+            RpcClientFactory.getDefaultInstance(hosts.get(count).getHostName(),
+                hosts.get(count).getPort());
+        lastCheckedhost = count;
+        return localClient;
+      } catch (FlumeException e) {
+        logger.info("Could not connect to " + hosts.get(count).getHostName()
+            +":"+ String.valueOf(hosts.get(count).getPort()), e);
+        continue;
+      }
+    }
+    for(int count = 0; count <= lastCheckedhost; count++){
+      try {
+        localClient =
+            RpcClientFactory.getDefaultInstance(hosts.get(count).getHostName(),
+                hosts.get(count).getPort());
+        lastCheckedhost = count;
+        return localClient;
+      } catch (FlumeException e) {
+        logger.info("Could not connect to " + hosts.get(count).getHostName()
+            +":"+ String.valueOf(hosts.get(count).getPort()), e);
+        continue;
+      }
+    }
+    if (localClient == null) {
+      lastCheckedhost = -1;
+      logger.error("No active client found.");
+      throw new FlumeException("No active client.");
+    }
+    // This return should never be reached!
+    return localClient;
+  }
+
+  @Override
+  public void configure(Properties properties) throws FlumeException {
+    this.configureHosts(properties);
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java?rev=1306687&r1=1306686&r2=1306687&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java Thu Mar 29 01:58:53 2012
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Properties;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -41,13 +42,16 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Avro/Netty implementation of {@link RpcClient}.
  * The connections are intended to be opened before clients are given access so
  * that the object cannot ever be in an inconsistent when exposed to users.
  */
-public class NettyAvroRpcClient implements RpcClient {
+public class NettyAvroRpcClient extends AbstractRpcClient
+implements RpcClient {
 
   private final ReentrantLock stateLock = new ReentrantLock();
 
@@ -62,34 +66,43 @@ public class NettyAvroRpcClient implemen
    */
   private ConnState connState;
 
-  private final String hostname;
-  private final Integer port;
-  private final Integer batchSize;
+  private InetSocketAddress address;
 
   private Transceiver transceiver;
   private AvroSourceProtocol.Callback avroClient;
+  private static final Logger logger = LoggerFactory
+      .getLogger(NettyAvroRpcClient.class);
 
   /**
-   * This constructor is intended to be called from {@link AvroClientBuilder}.
-   * @param hostname The destination hostname
-   * @param port The destination port
+   * This constructor is intended to be called from {@link RpcClientFactory}.
+   * @param address The InetSocketAddress to connect to
    * @param batchSize Maximum number of Events to accept in appendBatch()
    */
-  private NettyAvroRpcClient(String hostname, Integer port, Integer batchSize) {
-
-    if (hostname == null) throw new NullPointerException("hostname is null");
-    if (port == null) throw new NullPointerException("port is null");
-    if (batchSize == null) throw new NullPointerException("batchSize is null");
-
-    this.hostname = hostname;
-    this.port = port;
+  protected NettyAvroRpcClient(InetSocketAddress address, Integer batchSize)
+      throws FlumeException{
+    if (address == null){
+      logger.error("InetSocketAddress is null, cannot create client.");
+      throw new NullPointerException("InetSocketAddress is null");
+    }
+    this.address = address;
+    if(batchSize == null || batchSize == 0){
+      this.batchSize = DEFAULT_BATCH_SIZE;
+    }
+    else{
     this.batchSize = batchSize;
+    }
+    connect();
+  }
 
-    setState(ConnState.INIT);
+  /**
+   * This constructor is intended to be called from {@link RpcClientFactory}.
+   * A call to this constructor should be followed by call to configure().
+   */
+  protected NettyAvroRpcClient(){
   }
 
   /**
-   * This method should only be invoked by the Builder
+   * This method should only be invoked by the build function
    * @throws FlumeException
    */
   private void connect() throws FlumeException {
@@ -104,13 +117,12 @@ public class NettyAvroRpcClient implemen
    */
   private void connect(long timeout, TimeUnit tu) throws FlumeException {
     try {
-      transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port),
-          tu.toMillis(timeout));
+      transceiver = new NettyTransceiver(this.address, tu.toMillis(timeout));
       avroClient =
           SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
           transceiver);
-
     } catch (IOException ex) {
+      logger.error("RPC connection error :" , ex);
       throw new FlumeException("RPC connection error. Exception follows.", ex);
     }
 
@@ -122,6 +134,7 @@ public class NettyAvroRpcClient implemen
     try {
       transceiver.close();
     } catch (IOException ex) {
+      logger.error("Error closing transceiver. " , ex);
       throw new FlumeException("Error closing transceiver. Exception follows.",
           ex);
     } finally {
@@ -131,11 +144,6 @@ public class NettyAvroRpcClient implemen
   }
 
   @Override
-  public int getBatchSize() {
-    return batchSize;
-  }
-
-  @Override
   public void append(Event event) throws EventDeliveryException {
     try {
       append(event, DEFAULT_REQUEST_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -160,6 +168,7 @@ public class NettyAvroRpcClient implemen
       avroEvent.setHeaders(toCharSeqMap(event.getHeaders()));
       avroClient.append(avroEvent, callFuture);
     } catch (IOException ex) {
+      logger.error("RPC request IO exception. " , ex);
       throw new EventDeliveryException("RPC request IO exception. " +
           "Exception follows.", ex);
     }
@@ -204,6 +213,7 @@ public class NettyAvroRpcClient implemen
       try {
         avroClient.appendBatch(avroEvents, callFuture);
       } catch (IOException ex) {
+        logger.error("RPC request IO exception. " , ex);
         throw new EventDeliveryException("RPC request IO exception. " +
             "Exception follows.", ex);
       }
@@ -225,18 +235,23 @@ public class NettyAvroRpcClient implemen
     try {
       Status status = callFuture.get(timeout, tu);
       if (status != Status.OK) {
+        logger.error("Status (" + status + ") is not OK");
         throw new EventDeliveryException("Status (" + status + ") is not OK");
       }
     } catch (CancellationException ex) {
+      logger.error("RPC future was cancelled." , ex);
       throw new EventDeliveryException("RPC future was cancelled." +
           " Exception follows.", ex);
     } catch (ExecutionException ex) {
+      logger.error("Exception thrown from remote handler." , ex);
       throw new EventDeliveryException("Exception thrown from remote handler." +
           " Exception follows.", ex);
     } catch (TimeoutException ex) {
+      logger.error("RPC request timed out." , ex);
       throw new EventDeliveryException("RPC request timed out." +
           " Exception follows.", ex);
     } catch (InterruptedException ex) {
+      logger.error("RPC request interrupted." , ex);
       Thread.currentThread().interrupt();
       throw new EventDeliveryException("RPC request interrupted." +
           " Exception follows.", ex);
@@ -255,6 +270,7 @@ public class NettyAvroRpcClient implemen
     stateLock.lock();
     try {
       if (connState == ConnState.DEAD && connState != newState) {
+        logger.error("Cannot transition from CLOSED state.");
         throw new IllegalStateException("Cannot transition from CLOSED state.");
       }
       connState = newState;
@@ -271,6 +287,7 @@ public class NettyAvroRpcClient implemen
     try {
       ConnState curState = connState;
       if (curState != ConnState.READY) {
+        logger.error("RPC failed, client in an invalid state: " + curState);
         throw new EventDeliveryException("RPC failed, client in an invalid " +
             "state: " + curState);
       }
@@ -306,90 +323,70 @@ public class NettyAvroRpcClient implemen
     INIT, READY, DEAD
   }
 
-  /**
-   * <p>Builder class used to construct {@link NettyAvroRpcClient} objects.</p>
-   *
-   * <p><strong>Note:</strong> It is recommended for applications to construct
-   * {@link RpcClient} instances using the {@link RpcClientFactory} class.</p>
-   */
-  protected static class Builder {
-
-    protected static final int DEFAULT_BATCH_SIZE = 100;
-
-    private String hostname;
-    private Integer port;
-    private Integer batchSize;
-
-    public Builder() {
-      batchSize = DEFAULT_BATCH_SIZE;
-    }
 
     /**
-     * Hostname to connect to (required)
-     *
-     * @param hostname
-     * @return {@code this}
+   * <p>
+   * Configure the actual client using the properties.
+   * <tt>properties</tt> should have at least 2 params:
+   * <p><tt>hosts</tt> = <i>alias_for_host</i></p>
+   * <p><tt>alias_for_host</tt> = <i>hostname:port</i>. </p>
+   * Only the first host is added, rest are discarded.</p>
+   * <p>Optionally it can also have a <p>
+   * <tt>batch-size</tt> = <i>batchSize</i>
+   * @param properties The properties to instantiate the client with.
+   * @return
      */
-    public Builder hostname(String hostname) {
-      if (hostname == null) {
-        throw new NullPointerException("hostname is null");
-      }
-
-      this.hostname = hostname;
-      return this;
+  @Override
+  public synchronized void configure(Properties properties)
+      throws FlumeException {
+    stateLock.lock();
+    try{
+      if(connState == ConnState.READY || connState == ConnState.DEAD){
+        logger.error("This client was already configured, " +
+            "cannot reconfigure.");
+        throw new FlumeException("This client was already configured, " +
+            "cannot reconfigure.");
     }
-
-    /**
-     * Port to connect to (required)
-     *
-     * @param port
-     * @return {@code this}
-     */
-    public Builder port(Integer port) {
-      if (port == null) {
-        throw new NullPointerException("port is null");
+    } finally {
+      stateLock.unlock();
       }
-
-      this.port = port;
-      return this;
-    }
-
-    /**
-     * Maximum number of {@linkplain Event events} that can be processed in a
-     * batch operation. (optional)<br>
-     * Default: 100
-     *
-     * @param batchSize
-     * @return {@code this}
-     */
-    public Builder batchSize(Integer batchSize) {
-      if (batchSize == null) {
-        throw new NullPointerException("batch size is null");
+    String strbatchSize = properties.getProperty("batch-size");
+    batchSize = DEFAULT_BATCH_SIZE;
+    if (strbatchSize != null && !strbatchSize.isEmpty()) {
+      try {
+        batchSize = Integer.parseInt(strbatchSize);
+      } catch (NumberFormatException e) {
+        logger.warn("Batchsize is not valid for RpcClient: " + strbatchSize +
+            ".Default value assigned.", e);
       }
-
-      this.batchSize = batchSize;
-      return this;
     }
-
-    /**
-     * Construct the object
-     * @return Active RPC client
-     * @throws FlumeException
-     */
-    public NettyAvroRpcClient build() throws FlumeException {
-      // validate the required fields
-      if (hostname == null) throw new NullPointerException("hostname is null");
-      if (port == null) throw new NullPointerException("port is null");
-      if (batchSize == null) {
-        throw new NullPointerException("batch size is null");
-      }
-
-      NettyAvroRpcClient client =
-          new NettyAvroRpcClient(hostname, port, batchSize);
-      client.connect();
-
-      return client;
+    String hostNames = properties.getProperty(CONFIG_HOSTS);
+    String[] hosts = null;
+    if (hostNames != null && !hostNames.isEmpty()) {
+      hosts = hostNames.split("\\s+");
+    } else {
+      logger.error("Hosts list is invalid: "+ hostNames);
+      throw new FlumeException("Hosts list is invalid: "+ hostNames);
+    }
+    String host = properties.getProperty(HOSTS_PREFIX+hosts[0]);
+    if (host == null || host.isEmpty()) {
+      logger.error("Host not found: " + hosts[0]);
+      throw new FlumeException("Host not found: " + hosts[0]);
+    }
+    String[] hostAndPort = host.split(":");
+    if (hostAndPort.length != 2){
+      logger.error("Invalid hostname, " + hosts[0]);
+      throw new FlumeException("Invalid hostname, " + hosts[0]);
+    }
+    Integer port = null;
+    try {
+      port = Integer.parseInt(hostAndPort[1]);
+    } catch (NumberFormatException e) {
+      logger.error("Invalid Port:" + hostAndPort[1], e);
+      throw new FlumeException("Invalid Port:" + hostAndPort[1], e);
     }
+    this.address = new InetSocketAddress(hostAndPort[0], port);
+    this.connect();
   }
 
 }

Modified: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java?rev=1306687&r1=1306686&r2=1306687&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java Thu Mar 29 01:58:53 2012
@@ -16,6 +16,7 @@
 package org.apache.flume.api;
 
 import java.util.List;
+
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -103,4 +104,5 @@ public interface RpcClient {
    */
   public void close() throws FlumeException;
 
+
 }

Modified: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java?rev=1306687&r1=1306686&r2=1306687&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java Thu Mar 29 01:58:53 2012
@@ -15,6 +15,9 @@
  */
 package org.apache.flume.api;
 
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
 import org.apache.flume.FlumeException;
 
 /**
@@ -22,16 +25,75 @@ import org.apache.flume.FlumeException;
  */
 public class RpcClientFactory {
 
+  private final static String CONF_CLIENT_TYPE = "client.type";
+  /**
+   * Returns an instance of {@link RpcClient}, optionally with failover.
+   * To create a failover client, the properties object should have a
+   * property <tt>client.type</tt> which has the value "failover". The client
+   * connects to hosts specified by <tt>hosts</tt> property in given properties.
+   *
+   * @see org.apache.flume.api.FailoverRpcClient
+   * <p>
+   * If no <tt>client.type</tt> is specified, a default client that connects to
+   * single host at a given port is created.(<tt>type</tt> can also simply be
+   * <tt>netty</tt> for the default client).
+   *
+   * @see org.apache.flume.api.NettyAvroClient
+   *
+   * @param properties The properties to instantiate the client with.
+   * @throws FlumeException
+   */
+
+  @SuppressWarnings("unchecked")
+  public static RpcClient getInstance(Properties properties)
+      throws FlumeException {
+    String type = null;
+    type = properties.getProperty(CONF_CLIENT_TYPE);
+    if (type == null || type.isEmpty()) {
+      type = ClientType.DEFAULT.getClientClassName();
+    }
+    Class<? extends AbstractRpcClient> clazz;
+    AbstractRpcClient client;
+    try {
+      String clientClassType = type;
+      ClientType clientType = null;
+      try{
+        clientType = ClientType.valueOf(type.toUpperCase());
+      } catch (IllegalArgumentException e){
+        clientType = ClientType.OTHER;
+      }
+      if (!clientType.equals(ClientType.OTHER)){
+        clientClassType = clientType.getClientClassName();
+      }
+      clazz =
+          (Class<? extends AbstractRpcClient>) Class.forName(clientClassType);
+    } catch (ClassNotFoundException e) {
+      throw new FlumeException("No such client!", e);
+    }
+
+    try {
+      client = clazz.newInstance();
+    } catch (InstantiationException e) {
+      throw new FlumeException("Cannot instantiate client. " +
+          "Exception follows:", e);
+    } catch (IllegalAccessException e) {
+      throw new FlumeException("Cannot instantiate client. " +
+          "Exception follows:", e);
+    }
+    client.configure(properties);
+    return client;
+
+  }
+
   /**
    * Returns an instance of {@link RpcClient} connected to the specified
    * {@code hostname} and {@code port}.
    * @throws FlumeException
    */
-  public static RpcClient getInstance(String hostname, Integer port)
+  public static RpcClient getDefaultInstance(String hostname, Integer port)
       throws FlumeException {
+    return getDefaultInstance(hostname, port, 0);
 
-    return new NettyAvroRpcClient.Builder()
-        .hostname(hostname).port(port).build();
   }
 
   /**
@@ -39,11 +101,28 @@ public class RpcClientFactory {
    * {@code hostname} and {@code port} with the specified {@code batchSize}.
    * @throws FlumeException
    */
-  public static RpcClient getInstance(String hostname, Integer port,
+  public static RpcClient getDefaultInstance(String hostname, Integer port,
       Integer batchSize) throws FlumeException {
+    NettyAvroRpcClient client = new NettyAvroRpcClient(
+        new InetSocketAddress(hostname, port), batchSize);
+    return client;
 
-    return new NettyAvroRpcClient.Builder()
-        .hostname(hostname).port(port).batchSize(batchSize).build();
   }
 
+  private static enum ClientType {
+    OTHER(null),
+    DEFAULT("org.apache.flume.api.NettyAvroRpcClient"),
+    DEFAULT_FAILOVER("org.apache.flume.api.FailoverRpcClient");
+
+    private final String clientClassName;
+
+    private ClientType(String className) {
+      this.clientClassName = className;
+  }
+
+    protected String getClientClassName() {
+      return this.clientClassName;
+    }
+
+  }
 }

Modified: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java?rev=1306687&r1=1306686&r2=1306687&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java Thu Mar 29 01:58:53 2012
@@ -99,8 +99,8 @@ public class RpcTestUtils {
    * Helper method for constructing a Netty RPC client that talks to localhost.
    */
   public static NettyAvroRpcClient getStockLocalClient(int port) {
-    NettyAvroRpcClient client = new NettyAvroRpcClient.Builder()
-        .hostname(localhost).port(port).build();
+    NettyAvroRpcClient client =
+       new NettyAvroRpcClient(new InetSocketAddress("localhost", port), 0);
 
     return client;
   }
@@ -108,11 +108,11 @@ public class RpcTestUtils {
   /**
    * Start a NettyServer, wait a moment for it to spin up, and return it.
    */
-  public static Server startServer(AvroSourceProtocol handler) {
+  public static Server startServer(AvroSourceProtocol handler, int port) {
     Responder responder = new SpecificResponder(AvroSourceProtocol.class,
         handler);
     Server server = new NettyServer(responder,
-        new InetSocketAddress(localhost, 0));
+        new InetSocketAddress(localhost, port));
     server.start();
     logger.info("Server started on hostname: {}, port: {}",
         new Object[] { localhost, Integer.toString(server.getPort()) });
@@ -129,6 +129,10 @@ public class RpcTestUtils {
     return server;
   }
 
+  public static Server startServer(AvroSourceProtocol handler) {
+    return startServer(handler, 0);
+  }
+
   /**
    * Request that the specified Server stop, and attempt to wait for it to exit.
    * @param server A running NettyServer

Added: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java?rev=1306687&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java Thu Mar 29 01:58:53 2012
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.flume.api;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.avro.ipc.Server;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFailoverRpcClient {
+  /**
+   * Test a bunch of servers closing the one we are writing to and bringing
+   * another one back online.
+   *
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+
+  @Test
+  public void testFailover() throws FlumeException, EventDeliveryException {
+    FailoverRpcClient client = null;
+    Server server1 = RpcTestUtils.startServer(new OKAvroHandler());
+    Server server2 = RpcTestUtils.startServer(new OKAvroHandler());
+    Server server3 = RpcTestUtils.startServer(new OKAvroHandler());
+    Properties props = new Properties();
+    int s1Port = server1.getPort();
+    int s2Port = server2.getPort();
+    int s3Port = server3.getPort();
+    props.put("client.type", "default_failover");
+    props.put("hosts", "host1 host2 host3");
+    props.put("hosts.host1", " localhost:" + String.valueOf(s1Port));
+    props.put("hosts.host2", " localhost:" + String.valueOf(s2Port));
+    props.put("hosts.host3", " localhost:" + String.valueOf(s3Port));
+    client = (FailoverRpcClient) RpcClientFactory.getInstance(props);
+    List<Event> events = new ArrayList<Event>();
+    for (int i = 0; i < 50; i++) {
+      events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+    }
+    client.appendBatch(events);
+    Assert.assertEquals(client.getLastConnectedServerAddress(),
+        new InetSocketAddress("localhost", server1.getPort()));
+    server1.close();
+    events = new ArrayList<Event>();
+    for (int i = 0; i < 50; i++) {
+      events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+    }
+    client.appendBatch(events);
+    Assert.assertEquals(new InetSocketAddress("localhost", server2.getPort()),
+        client.getLastConnectedServerAddress());
+    server2.close();
+    client.append(EventBuilder.withBody("Had a sandwich?",
+        Charset.forName("UTF8")));
+    Assert.assertEquals(new InetSocketAddress("localhost", server3.getPort()),
+        client.getLastConnectedServerAddress());
+    // Bring server 2 back.
+    Server server4 = RpcTestUtils.startServer(new OKAvroHandler(), s2Port);
+    server3.close();
+    events = new ArrayList<Event>();
+    for (int i = 0; i < 50; i++) {
+      events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+    }
+    client.appendBatch(events);
+    Assert.assertEquals(new InetSocketAddress("localhost", s2Port),
+        client.getLastConnectedServerAddress());
+
+    Server server5 = RpcTestUtils.startServer(new OKAvroHandler(), s1Port);
+    // Make sure we are still talking to server 4
+
+    client
+    .append(EventBuilder.withBody("Had a mango?", Charset.forName("UTF8")));
+    Assert.assertEquals(new InetSocketAddress("localhost", s2Port),
+        client.getLastConnectedServerAddress());
+    server4.close();
+
+    events = new ArrayList<Event>();
+    for (int i = 0; i < 50; i++) {
+      events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+    }
+    client.appendBatch(events);
+    Assert.assertEquals(new InetSocketAddress("localhost", s1Port),
+        client.getLastConnectedServerAddress());
+    server5.close();
+    Server server6 = RpcTestUtils.startServer(new OKAvroHandler(), s1Port);
+    client
+    .append(EventBuilder.withBody("Had a whole watermelon?",
+        Charset.forName("UTF8")));
+    Assert.assertEquals(new InetSocketAddress("localhost", s1Port),
+        client.getLastConnectedServerAddress());
+
+    server6.close();
+    Server server7 = RpcTestUtils.startServer(new OKAvroHandler(), s3Port);
+    events = new ArrayList<Event>();
+    for (int i = 0; i < 50; i++) {
+      events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+    }
+    client.appendBatch(events);
+    Assert.assertEquals(new InetSocketAddress("localhost", s3Port),
+        client.getLastConnectedServerAddress());
+    server7.close();
+  }
+
+  /**
+   * Try writing to some servers and then kill them all.
+   *
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test(
+      expected = EventDeliveryException.class)
+  public void testFailedServers() throws FlumeException, EventDeliveryException {
+    FailoverRpcClient client = null;
+    Server server1 = RpcTestUtils.startServer(new OKAvroHandler());
+    Server server2 = RpcTestUtils.startServer(new OKAvroHandler());
+    Server server3 = RpcTestUtils.startServer(new OKAvroHandler());
+    Properties props = new Properties();
+    props.put("client.type", "default_failover");
+
+    props.put("hosts", "host1 host2 host3");
+    props.put("hosts.host1", "localhost:" + String.valueOf(server1.getPort()));
+    props.put("hosts.host2", "localhost:" + String.valueOf(server2.getPort()));
+    props.put("hosts.host3", " localhost:" + String.valueOf(server3.getPort()));
+    client = (FailoverRpcClient) RpcClientFactory.getInstance(props);
+    List<Event> events = new ArrayList<Event>();
+    for (int i = 0; i < 50; i++) {
+      events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+    }
+    client.appendBatch(events);
+    server1.close();
+    server2.close();
+    server3.close();
+    events = new ArrayList<Event>();
+    for (int i = 0; i < 50; i++) {
+      events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+    }
+    client.appendBatch(events);
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java?rev=1306687&r1=1306686&r2=1306687&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java Thu Mar 29 01:58:53 2012
@@ -15,6 +15,7 @@
  */
 package org.apache.flume.api;
 
+import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
@@ -74,8 +75,8 @@ public class TestNettyAvroRpcClient {
    */
   @Test(expected=FlumeException.class)
   public void testUnableToConnect() throws FlumeException {
-    NettyAvroRpcClient client = new NettyAvroRpcClient.Builder()
-        .hostname(localhost).port(1).build();
+    NettyAvroRpcClient client = new NettyAvroRpcClient(
+        new InetSocketAddress(localhost, 1), 0);
   }
 
   /**
@@ -91,9 +92,8 @@ public class TestNettyAvroRpcClient {
     NettyAvroRpcClient client = null;
     Server server = RpcTestUtils.startServer(new OKAvroHandler());
     try {
-      client = new NettyAvroRpcClient.Builder()
-          .hostname(localhost).port(server.getPort()).batchSize(batchSize)
-          .build();
+      client = new NettyAvroRpcClient(
+          new InetSocketAddress(localhost, server.getPort()), batchSize);
 
       // send one more than the batch size
       List<Event> events = new ArrayList<Event>();

Modified: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java?rev=1306687&r1=1306686&r2=1306687&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java Thu Mar 29 01:58:53 2012
@@ -18,6 +18,8 @@ package org.apache.flume.api;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
+
 import org.apache.avro.ipc.Server;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
@@ -41,7 +43,7 @@ public class TestRpcClientFactory {
     RpcClient client = null;
     Server server = RpcTestUtils.startServer(new OKAvroHandler());
     try {
-      client = RpcClientFactory.getInstance(localhost, server.getPort());
+      client = RpcClientFactory.getDefaultInstance(localhost, server.getPort());
       client.append(EventBuilder.withBody("wheee!!!", Charset.forName("UTF8")));
     } finally {
       RpcTestUtils.stopServer(server);
@@ -56,7 +58,7 @@ public class TestRpcClientFactory {
     RpcClient client = null;
     Server server = RpcTestUtils.startServer(new OKAvroHandler());
     try {
-      client = RpcClientFactory.getInstance(localhost, server.getPort(),
+      client = RpcClientFactory.getDefaultInstance(localhost, server.getPort(),
           batchSize);
 
       List<Event> events = new ArrayList<Event>();
@@ -70,6 +72,29 @@ public class TestRpcClientFactory {
     }
   }
 
+  @Test
+  public void testPropertiesBatchAppend() throws FlumeException,
+      EventDeliveryException {
+    int batchSize = 7;
+    RpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    try {
+      Properties p = new Properties();
+      p.put("hosts", "host1");
+      p.put("hosts.host1", localhost + ":" + String.valueOf(server.getPort()));
+      p.put("batch-size", String.valueOf(batchSize));
+      client = RpcClientFactory.getInstance(p);
+      List<Event> events = new ArrayList<Event>();
+      for (int i = 0; i < batchSize; i++) {
+        events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+      }
+      client.appendBatch(events);
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) client.close();
+    }
+  }
+
   // we are supposed to handle this gracefully
   @Test
   public void testTwoParamBatchAppendOverflow() throws FlumeException,
@@ -77,7 +102,7 @@ public class TestRpcClientFactory {
     RpcClient client = null;
     Server server = RpcTestUtils.startServer(new OKAvroHandler());
     try {
-      client = RpcClientFactory.getInstance(localhost, server.getPort());
+      client = RpcClientFactory.getDefaultInstance(localhost, server.getPort());
       int batchSize = client.getBatchSize();
       int moreThanBatch = batchSize + 1;
       List<Event> events = new ArrayList<Event>();



Mime
View raw message