flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1755: Load balancing RPC client has issues with downed hosts
Date Fri, 07 Dec 2012 15:25:36 GMT
Updated Branches:
  refs/heads/trunk aa549c4f2 -> 2f5438dbd


FLUME-1755: Load balancing RPC client has issues with downed hosts

(Mike Percy via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2f5438db
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2f5438db
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2f5438db

Branch: refs/heads/trunk
Commit: 2f5438dbdb0040e1d77d0fcc0cc12ba7705b4323
Parents: aa549c4
Author: Brock Noland <brock@apache.org>
Authored: Fri Dec 7 09:24:57 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Fri Dec 7 09:24:57 2012 -0600

----------------------------------------------------------------------
 .../apache/flume/api/LoadBalancingRpcClient.java   |   11 ++--
 .../flume/api/TestLoadBalancingRpcClient.java      |   48 ++++++++++++++-
 2 files changed, 53 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2f5438db/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
index 42297c1..f396104 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
@@ -65,9 +65,8 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
 
     while (it.hasNext()) {
       HostInfo host = it.next();
-      RpcClient client;
       try {
-        client = getClient(host);
+        RpcClient client = getClient(host);
         client.append(event);
         eventSent = true;
         break;
@@ -89,8 +88,8 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
 
     while (it.hasNext()) {
       HostInfo host = it.next();
-      RpcClient client = getClient(host);
       try {
+        RpcClient client = getClient(host);
         client.appendBatch(events);
         batchSent = true;
         break;
@@ -180,7 +179,9 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
     selector.setHosts(hosts);
   }
 
-  private synchronized RpcClient getClient(HostInfo info) {
+  private synchronized RpcClient getClient(HostInfo info)
+      throws FlumeException {
+
     String name = info.getReferenceName();
     RpcClient client = clientMap.get(name);
     if (client == null) {
@@ -199,7 +200,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
     return client;
   }
 
-  private RpcClient createClient(String referenceName) {
+  private RpcClient createClient(String referenceName) throws FlumeException {
     Properties props = getClientConfigurationProperties(referenceName);
     return RpcClientFactory.getInstance(props);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2f5438db/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
index 49a69bf..9071734 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
@@ -97,6 +97,52 @@ public class TestLoadBalancingRpcClient {
     }
   }
 
+  /**
+   * Ensure that we can tolerate a host that is completely down.
+   * @throws Exception
+   */
+  @Test
+  public void testTwoHostsOneDead() throws Exception {
+    LOGGER.info("Running testTwoHostsOneDead...");
+    Server s1 = null;
+    RpcClient c1 = null, c2 = null;
+    try {
+      LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler();
+      s1 = RpcTestUtils.startServer(h1);
+      // do not create a 2nd server (assume it's "down")
+
+      Properties p = new Properties();
+      p.put("hosts", "h1 h2");
+      p.put("client.type", "default_loadbalance");
+      p.put("hosts.h1", "127.0.0.1:" + 0); // port 0 should always be closed
+      p.put("hosts.h2", "127.0.0.1:" + s1.getPort());
+
+      // test batch API
+      c1 = RpcClientFactory.getInstance(p);
+      Assert.assertTrue(c1 instanceof LoadBalancingRpcClient);
+
+      for (int i = 0; i < 10; i++) {
+        c1.appendBatch(getBatchedEvent(i));
+      }
+      Assert.assertEquals(10, h1.getAppendBatchCount());
+
+      // test non-batch API
+      c2 = RpcClientFactory.getInstance(p);
+      Assert.assertTrue(c2 instanceof LoadBalancingRpcClient);
+
+      for (int i = 0; i < 10; i++) {
+        c2.append(getEvent(i));
+      }
+      Assert.assertEquals(10, h1.getAppendCount());
+
+
+    } finally {
+      if (s1 != null) s1.close();
+      if (c1 != null) c1.close();
+      if (c2 != null) c2.close();
+    }
+  }
+
   @Test
   public void testTwoHostFailoverBatch() throws Exception {
     Server s1 = null, s2 = null;
@@ -584,7 +630,7 @@ public class TestLoadBalancingRpcClient {
 
   private List<Event> getBatchedEvent(int index) {
     List<Event> result = new ArrayList<Event>();
-    result.add(EventBuilder.withBody(("event: " + index).getBytes()));
+    result.add(getEvent(index));
     return result;
   }
 


Mime
View raw message