Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 62A8EF036 for ; Wed, 8 May 2013 23:03:08 +0000 (UTC) Received: (qmail 82120 invoked by uid 500); 8 May 2013 23:03:08 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 82090 invoked by uid 500); 8 May 2013 23:03:08 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 82082 invoked by uid 99); 8 May 2013 23:03:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 May 2013 23:03:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 14E4E88A1F0; Wed, 8 May 2013 23:03:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@flume.apache.org Message-Id: <067c9e3e838540f2b91ffb4df3de7659@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1823. LoadBalancingRpcClient method must throw exception if it is called after close is called. Date: Wed, 8 May 2013 23:03:08 +0000 (UTC) Updated Branches: refs/heads/flume-1.4 a0127f693 -> b2b45edda FLUME-1823. LoadBalancingRpcClient method must throw exception if it is called after close is called. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b2b45edd Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b2b45edd Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b2b45edd Branch: refs/heads/flume-1.4 Commit: b2b45eddad92baae6c2d2958b98b4311217f242c Parents: a0127f6 Author: Mike Percy Authored: Wed May 8 16:02:37 2013 -0700 Committer: Mike Percy Committed: Wed May 8 16:02:57 2013 -0700 ---------------------------------------------------------------------- .../apache/flume/api/LoadBalancingRpcClient.java | 19 +++++-- .../flume/api/TestLoadBalancingRpcClient.java | 41 +++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/b2b45edd/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java index f396104..e5fcc36 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java @@ -57,9 +57,11 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { private HostSelector selector; private Map clientMap; private Properties configurationProperties; + private volatile boolean isOpen = false; @Override public void append(Event event) throws EventDeliveryException { + throwIfClosed(); boolean eventSent = false; Iterator it = selector.createHostIterator(); @@ -83,6 +85,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { @Override public void appendBatch(List events) throws EventDeliveryException { + throwIfClosed(); boolean batchSent = false; Iterator it = selector.createHostIterator(); @@ -106,13 +109,18 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { @Override public boolean isActive() { - // This client is always active and does not need to be replaced. - // Internally it will test the delegates and replace them where needed. - return true; + return isOpen; + } + + private void throwIfClosed() throws EventDeliveryException { + if (!isOpen) { + throw new EventDeliveryException("Rpc Client is closed"); + } } @Override public void close() throws FlumeException { + isOpen = false; synchronized (this) { Iterator it = clientMap.keySet().iterator(); while (it.hasNext()) { @@ -177,11 +185,12 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { } selector.setHosts(hosts); + isOpen = true; } private synchronized RpcClient getClient(HostInfo info) - throws FlumeException { - + throws FlumeException, EventDeliveryException { + throwIfClosed(); String name = info.getReferenceName(); RpcClient client = clientMap.get(name); if (client == null) { http://git-wip-us.apache.org/repos/asf/flume/blob/b2b45edd/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java index 9071734..5d6828b 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java @@ -97,6 +97,47 @@ public class TestLoadBalancingRpcClient { } } + // This will fail without FLUME-1823 + @Test(expected = EventDeliveryException.class) + public void testTwoHostFailoverThrowAfterClose() throws Exception { + Server s1 = null, s2 = null; + RpcClient c = null; + try{ + LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler(); + LoadBalancedAvroHandler h2 = new LoadBalancedAvroHandler(); + + s1 = RpcTestUtils.startServer(h1); + s2 = RpcTestUtils.startServer(h2); + + Properties p = new Properties(); + p.put("hosts", "h1 h2"); + p.put("client.type", "default_loadbalance"); + p.put("hosts.h1", "127.0.0.1:" + s1.getPort()); + p.put("hosts.h2", "127.0.0.1:" + s2.getPort()); + + c = RpcClientFactory.getInstance(p); + Assert.assertTrue(c instanceof LoadBalancingRpcClient); + + for (int i = 0; i < 100; i++) { + if (i == 20) { + h2.setFailed(); + } else if (i == 40) { + h2.setOK(); + } + c.append(getEvent(i)); + } + + Assert.assertEquals(60, h1.getAppendCount()); + Assert.assertEquals(40, h2.getAppendCount()); + if (c != null) c.close(); + c.append(getEvent(3)); + Assert.fail(); + } finally { + if (s1 != null) s1.close(); + if (s2 != null) s2.close(); + } + } + /** * Ensure that we can tolerate a host that is completely down. * @throws Exception