chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shre...@apache.org
Subject svn commit: r1606952 - in /chukwa/trunk: CHANGES.txt src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java
Date Tue, 01 Jul 2014 00:21:12 GMT
Author: shreyas
Date: Tue Jul  1 00:21:12 2014
New Revision: 1606952

URL: http://svn.apache.org/r1606952
Log:
CHUKWA-710. Set TCP socket reuse option for server sockets

Added:
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java
Modified:
    chukwa/trunk/CHANGES.txt
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java

Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1606952&r1=1606951&r2=1606952&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Tue Jul  1 00:21:12 2014
@@ -34,6 +34,8 @@ Release 0.6 - Unreleased
 
   IMPROVEMENTS
 
+    CHUKWA-710. Set TCP socket reuse option for server sockets. (Shreyas Subramanya)
+
     CHUKWA-698. Update RAT plugin to 0.10 release and fixed missing license header. (Eric
Yang)
 
     CHUKWA-696. Improve hicc stylesheets for form input.  (Eric Yang)
@@ -60,8 +62,6 @@ Release 0.6 - Unreleased
 
     CHUKWA-708. Update website reference to top level project.  (Eric Yang)
 
-    CHUKWA-710. Set TCP socket reuse option for server sockets. (Shreyas Subramanya)
-
     CHUKWA-694. Improve demux configuration processing in HBaseWriter.  (Eric Yang)
 
     CHUKWA-692. Fixed race condition of agent startup in TestExecAdaptor.  (Eric Yang)

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java?rev=1606952&r1=1606951&r2=1606952&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
(original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
Tue Jul  1 00:21:12 2014
@@ -54,8 +54,11 @@ public class SocketAdaptor extends Abstr
     
     public void run() {
       try{
-        listener = new ServerSocket(port);
+        listener = new ServerSocket();
         listener.setReuseAddress(true);
+        bindWithExponentialBackoff(listener, port, 12000);
+        log.info("SocketAdaptor bound successfully to port:" + port);
+        
         Socket server;
 
         while(running){
@@ -66,6 +69,12 @@ public class SocketAdaptor extends Abstr
         }
       } catch (IOException ioe) {
         log.error("SocketAdaptor Dispatcher problem:", ioe);
+      } finally {
+        try {
+          listener.close();
+        } catch (IOException e) {
+          log.warn("IOException closing socket on port:" + port);
+        }
       }
     }
     
@@ -76,6 +85,33 @@ public class SocketAdaptor extends Abstr
         log.debug(ExceptionUtil.getStackTrace(e));
       }
     }
+    
+    protected void bindWithExponentialBackoff(ServerSocket ss, int p,
+        int maxDelay) throws IOException {
+      int backoff = 1000;
+      int waitedTime = 0;
+      while (!ss.isBound()) {
+        try {
+          ss.bind(new InetSocketAddress(p));
+        } catch (IOException bindEx) {
+          backoff *= 2;
+          log.warn("IOException in bind:" + bindEx);
+          log.warn("Retrying bind to port " + p + " in milliseconds:" + backoff);
+          try {
+            Thread.sleep(backoff);
+          } catch (InterruptedException e) {
+            throw new IOException(
+                "Interrupted while trying to connect to port:" + p);
+          }
+        }
+        waitedTime += backoff;
+        if (waitedTime > maxDelay) {
+          throw new IOException("Could not bind to port:" + p
+              + " after waiting " + waitedTime
+              + " milliseconds. Abandoning this SocketAdaptor.");
+        }
+      }
+    }
   }
   
   class Worker implements Runnable {

Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java?rev=1606952&view=auto
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java
(added)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSocketAdaptor.java
Tue Jul  1 00:21:12 2014
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import junit.framework.TestCase;
+
+public class TestSocketAdaptor extends TestCase {
+  public void testBindRetry() {
+    int port = 9181;
+    int delay = 120000;
+    ServerSocket sock1 = null;
+    ServerSocket sock2 = null;
+    SocketAdaptor adaptor = new SocketAdaptor();
+    SocketAdaptor.Dispatcher disp = adaptor.new Dispatcher(port);
+    // test failure case
+    try {
+      sock1 = new ServerSocket();
+      sock1.setReuseAddress(true);
+      sock1.bind(new InetSocketAddress(port));
+      System.out.println("Bound to " + port);
+      assertTrue(sock1.isBound());
+    } catch (IOException e) {
+      fail("IOException binding to " + port);
+    }
+    // now try binding to the same port through SocketAdaptor
+    // making sure we retry until the specified time of 120s
+    long startTime = System.currentTimeMillis();
+    try {
+      sock2 = new ServerSocket();
+      sock2.setReuseAddress(true);
+      disp.bindWithExponentialBackoff(sock2, port, delay);
+      // we should not reach this statement
+      assertTrue(!sock2.isBound());
+    } catch (IOException ioe) {
+      long retryInterval = System.currentTimeMillis() - startTime;
+      System.out.println("Retried number of milliseconds :" + retryInterval);
+      if (retryInterval < delay) {
+        fail("SocketAdaptor did not retry bind for milliseconds:" + delay);
+      }
+    } finally {
+      try {
+        if (sock1 != null)
+          sock1.close();
+      } catch (IOException ignore) {
+      }
+    }
+
+    // test successful case
+    startTime = System.currentTimeMillis();
+    try {
+      disp.bindWithExponentialBackoff(sock2, port, delay);
+    } catch (IOException ioe) {
+      fail("IOException when trying to bind for the second time");
+    }
+    assertTrue(sock2.isBound());
+    System.out.println("Binding successful in milliseconds:"
+        + (System.currentTimeMillis() - startTime));
+    if (sock2 != null) {
+      try {
+        sock2.close();
+      } catch (IOException ignore) {
+      }
+    }
+  }
+}



Mime
View raw message