Return-Path: X-Original-To: apmail-chukwa-commits-archive@www.apache.org Delivered-To: apmail-chukwa-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 22D4D117EB for ; Tue, 1 Jul 2014 00:21:38 +0000 (UTC) Received: (qmail 59913 invoked by uid 500); 1 Jul 2014 00:21:38 -0000 Delivered-To: apmail-chukwa-commits-archive@chukwa.apache.org Received: (qmail 59886 invoked by uid 500); 1 Jul 2014 00:21:38 -0000 Mailing-List: contact commits-help@chukwa.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@chukwa.apache.org Delivered-To: mailing list commits@chukwa.apache.org Received: (qmail 59875 invoked by uid 99); 1 Jul 2014 00:21:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Jul 2014 00:21:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Jul 2014 00:21:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id EA9DA2388868; Tue, 1 Jul 2014 00:21:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1606952 - in /chukwa/trunk: CHANGES.txt src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java Date: Tue, 01 Jul 2014 00:21:12 -0000 To: commits@chukwa.apache.org From: shreyas@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140701002112.EA9DA2388868@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: shreyas Date: Tue Jul 1 00:21:12 2014 New Revision: 1606952 URL: http://svn.apache.org/r1606952 Log: CHUKWA-710. Set TCP socket reuse option for server sockets Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java Modified: chukwa/trunk/CHANGES.txt chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java Modified: chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1606952&r1=1606951&r2=1606952&view=diff ============================================================================== --- chukwa/trunk/CHANGES.txt (original) +++ chukwa/trunk/CHANGES.txt Tue Jul 1 00:21:12 2014 @@ -34,6 +34,8 @@ Release 0.6 - Unreleased IMPROVEMENTS + CHUKWA-710. Set TCP socket reuse option for server sockets. (Shreyas Subramanya) + CHUKWA-698. Update RAT plugin to 0.10 release and fixed missing license header. (Eric Yang) CHUKWA-696. Improve hicc stylesheets for form input. (Eric Yang) @@ -60,8 +62,6 @@ Release 0.6 - Unreleased CHUKWA-708. Update website reference to top level project. (Eric Yang) - CHUKWA-710. Set TCP socket reuse option for server sockets. (Shreyas Subramanya) - CHUKWA-694. Improve demux configuration processing in HBaseWriter. (Eric Yang) CHUKWA-692. Fixed race condition of agent startup in TestExecAdaptor. (Eric Yang) Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java?rev=1606952&r1=1606951&r2=1606952&view=diff ============================================================================== --- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java (original) +++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java Tue Jul 1 00:21:12 2014 @@ -54,8 +54,11 @@ public class SocketAdaptor extends Abstr public void run() { try{ - listener = new ServerSocket(port); + listener = new ServerSocket(); listener.setReuseAddress(true); + bindWithExponentialBackoff(listener, port, 12000); + log.info("SocketAdaptor bound successfully to port:" + port); + Socket server; while(running){ @@ -66,6 +69,12 @@ public class SocketAdaptor extends Abstr } } catch (IOException ioe) { log.error("SocketAdaptor Dispatcher problem:", ioe); + } finally { + try { + listener.close(); + } catch (IOException e) { + log.warn("IOException closing socket on port:" + port); + } } } @@ -76,6 +85,33 @@ public class SocketAdaptor extends Abstr log.debug(ExceptionUtil.getStackTrace(e)); } } + + protected void bindWithExponentialBackoff(ServerSocket ss, int p, + int maxDelay) throws IOException { + int backoff = 1000; + int waitedTime = 0; + while (!ss.isBound()) { + try { + ss.bind(new InetSocketAddress(p)); + } catch (IOException bindEx) { + backoff *= 2; + log.warn("IOException in bind:" + bindEx); + log.warn("Retrying bind to port " + p + " in milliseconds:" + backoff); + try { + Thread.sleep(backoff); + } catch (InterruptedException e) { + throw new IOException( + "Interrupted while trying to connect to port:" + p); + } + } + waitedTime += backoff; + if (waitedTime > maxDelay) { + throw new IOException("Could not bind to port:" + p + + " after waiting " + waitedTime + + " milliseconds. Abandoning this SocketAdaptor."); + } + } + } } class Worker implements Runnable { Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java?rev=1606952&view=auto ============================================================================== --- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java (added) +++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java Tue Jul 1 00:21:12 2014 @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.datacollection.adaptor; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; + +import junit.framework.TestCase; + +public class TestSocketAdaptor extends TestCase { + public void testBindRetry() { + int port = 9181; + int delay = 120000; + ServerSocket sock1 = null; + ServerSocket sock2 = null; + SocketAdaptor adaptor = new SocketAdaptor(); + SocketAdaptor.Dispatcher disp = adaptor.new Dispatcher(port); + // test failure case + try { + sock1 = new ServerSocket(); + sock1.setReuseAddress(true); + sock1.bind(new InetSocketAddress(port)); + System.out.println("Bound to " + port); + assertTrue(sock1.isBound()); + } catch (IOException e) { + fail("IOException binding to " + port); + } + // now try binding to the same port through SocketAdaptor + // making sure we retry until the specified time of 120s + long startTime = System.currentTimeMillis(); + try { + sock2 = new ServerSocket(); + sock2.setReuseAddress(true); + disp.bindWithExponentialBackoff(sock2, port, delay); + // we should not reach this statement + assertTrue(!sock2.isBound()); + } catch (IOException ioe) { + long retryInterval = System.currentTimeMillis() - startTime; + System.out.println("Retried number of milliseconds :" + retryInterval); + if (retryInterval < delay) { + fail("SocketAdaptor did not retry bind for milliseconds:" + delay); + } + } finally { + try { + if (sock1 != null) + sock1.close(); + } catch (IOException ignore) { + } + } + + // test successful case + startTime = System.currentTimeMillis(); + try { + disp.bindWithExponentialBackoff(sock2, port, delay); + } catch (IOException ioe) { + fail("IOException when trying to bind for the second time"); + } + assertTrue(sock2.isBound()); + System.out.println("Binding successful in milliseconds:" + + (System.currentTimeMillis() - startTime)); + if (sock2 != null) { + try { + sock2.close(); + } catch (IOException ignore) { + } + } + } +}