flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1823. LoadBalancingRpcClient method must throw exception if it is called after close is called.
Date Wed, 08 May 2013 23:03:08 GMT
Updated Branches:
  refs/heads/flume-1.4 a0127f693 -> b2b45edda


FLUME-1823. LoadBalancingRpcClient method must throw exception if it is called after close
is called.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b2b45edd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b2b45edd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b2b45edd

Branch: refs/heads/flume-1.4
Commit: b2b45eddad92baae6c2d2958b98b4311217f242c
Parents: a0127f6
Author: Mike Percy <mpercy@apache.org>
Authored: Wed May 8 16:02:37 2013 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Wed May 8 16:02:57 2013 -0700

----------------------------------------------------------------------
 .../apache/flume/api/LoadBalancingRpcClient.java   |   19 +++++--
 .../flume/api/TestLoadBalancingRpcClient.java      |   41 +++++++++++++++
 2 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b2b45edd/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
index f396104..e5fcc36 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
@@ -57,9 +57,11 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
   private HostSelector selector;
   private Map<String, RpcClient> clientMap;
   private Properties configurationProperties;
+  private volatile boolean isOpen = false;
 
   @Override
   public void append(Event event) throws EventDeliveryException {
+    throwIfClosed();
     boolean eventSent = false;
     Iterator<HostInfo> it = selector.createHostIterator();
 
@@ -83,6 +85,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
 
   @Override
   public void appendBatch(List<Event> events) throws EventDeliveryException {
+    throwIfClosed();
     boolean batchSent = false;
     Iterator<HostInfo> it = selector.createHostIterator();
 
@@ -106,13 +109,18 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
 
   @Override
   public boolean isActive() {
-    // This client is always active and does not need to be replaced.
-    // Internally it will test the delegates and replace them where needed.
-    return true;
+    return isOpen;
+  }
+
+  private void throwIfClosed() throws EventDeliveryException {
+    if (!isOpen) {
+      throw new EventDeliveryException("Rpc Client is closed");
+    }
   }
 
   @Override
   public void close() throws FlumeException {
+    isOpen = false;
     synchronized (this) {
       Iterator<String> it = clientMap.keySet().iterator();
       while (it.hasNext()) {
@@ -177,11 +185,12 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
     }
 
     selector.setHosts(hosts);
+    isOpen = true;
   }
 
   private synchronized RpcClient getClient(HostInfo info)
-      throws FlumeException {
-
+      throws FlumeException, EventDeliveryException {
+    throwIfClosed();
     String name = info.getReferenceName();
     RpcClient client = clientMap.get(name);
     if (client == null) {

http://git-wip-us.apache.org/repos/asf/flume/blob/b2b45edd/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
index 9071734..5d6828b 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
@@ -97,6 +97,47 @@ public class TestLoadBalancingRpcClient {
     }
   }
 
+  // This will fail without FLUME-1823
+  @Test(expected = EventDeliveryException.class)
+  public void testTwoHostFailoverThrowAfterClose() throws Exception {
+    Server s1 = null, s2 = null;
+    RpcClient c = null;
+    try{
+      LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler();
+      LoadBalancedAvroHandler h2 = new LoadBalancedAvroHandler();
+
+      s1 = RpcTestUtils.startServer(h1);
+      s2 = RpcTestUtils.startServer(h2);
+
+      Properties p = new Properties();
+      p.put("hosts", "h1 h2");
+      p.put("client.type", "default_loadbalance");
+      p.put("hosts.h1", "127.0.0.1:" + s1.getPort());
+      p.put("hosts.h2", "127.0.0.1:" + s2.getPort());
+
+      c = RpcClientFactory.getInstance(p);
+      Assert.assertTrue(c instanceof LoadBalancingRpcClient);
+
+      for (int i = 0; i < 100; i++) {
+        if (i == 20) {
+          h2.setFailed();
+        } else if (i == 40) {
+          h2.setOK();
+        }
+        c.append(getEvent(i));
+      }
+
+      Assert.assertEquals(60, h1.getAppendCount());
+      Assert.assertEquals(40, h2.getAppendCount());
+      if (c != null) c.close();
+      c.append(getEvent(3));
+      Assert.fail();
+    } finally {
+      if (s1 != null) s1.close();
+      if (s2 != null) s2.close();
+    }
+  }
+
   /**
    * Ensure that we can tolerate a host that is completely down.
    * @throws Exception


Mime
View raw message