lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From thelabd...@apache.org
Subject svn commit: r1649154 - in /lucene/dev/trunk/solr: ./ core/src/test/org/apache/solr/cloud/ test-framework/src/java/org/apache/solr/cloud/
Date Sat, 03 Jan 2015 00:20:47 GMT
Author: thelabdude
Date: Sat Jan  3 00:20:46 2015
New Revision: 1649154

URL: http://svn.apache.org/r1649154
Log:
SOLR-6874: There is a race around SocketProxy binding to it's port the way we setup JettySolrRunner
and SocketProxy.

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1649154&r1=1649153&r2=1649154&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sat Jan  3 00:20:46 2015
@@ -381,6 +381,9 @@ Bug Fixes
 
 * SOLR-6779: fix /browse for schemaless example (ehatcher)
 
+* SOLR-6874: There is a race around SocketProxy binding to it's port the way we setup 
+  JettySolrRunner and SocketProxy. (Mark Miller, Timothy Potter)
+
 
 Optimizations
 ----------------------

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java?rev=1649154&r1=1649153&r2=1649154&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java Sat
Jan  3 00:20:46 2015
@@ -55,9 +55,7 @@ public class ReplicationFactorTest exten
   
   private static final transient Logger log = 
       LoggerFactory.getLogger(ReplicationFactorTest.class);
-  
-  private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
-  
+
   public ReplicationFactorTest() {
     super();
     sliceCount = 3;
@@ -103,25 +101,8 @@ public class ReplicationFactorTest exten
   public JettySolrRunner createJetty(File solrHome, String dataDir,
       String shardList, String solrConfigOverride, String schemaOverride)
       throws Exception {
-    
-    JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
-        0, solrConfigOverride, schemaOverride, false,
-        getExtraServlets(), sslConfig, getExtraRequestFilters());
-    jetty.setShards(shardList);
-    jetty.setDataDir(getDataDir(dataDir));
-    
-    // setup to proxy Http requests to this server unless it is the control
-    // server
-    int proxyPort = getNextAvailablePort();
-    jetty.setProxyPort(proxyPort);
-    
-    jetty.start();
-    
-    // create a socket proxy for the jetty server ...
-    SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
-    proxies.put(proxy.getUrl(), proxy);
-    
-    return jetty;
+
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
   }
   
   protected int getNextAvailablePort() throws Exception {    
@@ -320,21 +301,7 @@ public class ReplicationFactorTest exten
     Thread.sleep(2000);
     ensureAllReplicasAreActive(testCollectionName, shardId, numShards, replicationFactor,
30);    
   } 
-    
-  protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
-    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
-    assertNotNull(replicaBaseUrl);
-    URL baseUrl = new URL(replicaBaseUrl);
-    
-    SocketProxy proxy = proxies.get(baseUrl.toURI());
-    if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
-      baseUrl = new URL(baseUrl.toExternalForm() + "/");
-      proxy = proxies.get(baseUrl.toURI());
-    }
-    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
-    return proxy;
-  }
-      
+
   protected int sendDoc(int docId, int minRf) throws Exception {
     UpdateRequest up = new UpdateRequest();
     up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));

Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1649154&r1=1649153&r2=1649154&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
(original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
Sat Jan  3 00:20:46 2015
@@ -536,16 +536,11 @@ public abstract class AbstractFullDistri
     jetty.setShards(shardList);
     jetty.setDataDir(getDataDir(dataDir));
 
-    // setup to proxy Http requests to this server unless it is the control
-    // server
-    int proxyPort = getNextAvailablePort();
-    jetty.setProxyPort(proxyPort);
+    SocketProxy proxy = new SocketProxy(0, sslConfig == null ? false : sslConfig.isSSLMode());
+    jetty.setProxyPort(proxy.getListenPort());
     jetty.start();
-
-    // create a socket proxy for the jetty server ...
-    SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
+    proxy.open(jetty.getBaseUrl().toURI());
     proxies.put(proxy.getUrl(), proxy);
-
     return jetty;
   }
 

Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java?rev=1649154&r1=1649153&r2=1649154&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java Sat
Jan  3 00:20:46 2015
@@ -59,24 +59,40 @@ public class SocketProxy {
   
   public List<Bridge> connections = new LinkedList<Bridge>();
   
-  private int listenPort = 0;
+  private final int listenPort;
   
   private int receiveBufferSize = -1;
   
   private boolean pauseAtStart = false;
   
   private int acceptBacklog = 50;
+
+  private boolean usesSSL;
+
+  public SocketProxy() throws Exception {
+    this(0, false);
+  }
   
-  public SocketProxy() throws Exception {}
+  public SocketProxy( boolean useSSL) throws Exception {
+    this(0, useSSL);
+  }
   
-  public SocketProxy(URI uri) throws Exception {
-    this(0, uri);
+  public SocketProxy(int port, boolean useSSL) throws Exception {
+    int listenPort = port;
+    this.usesSSL = useSSL;
+    serverSocket = createServerSocket(useSSL);
+    serverSocket.setReuseAddress(true);
+    if (receiveBufferSize > 0) {
+      serverSocket.setReceiveBufferSize(receiveBufferSize);
+    }
+    serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
+    this.listenPort = serverSocket.getLocalPort();
   }
   
-  public SocketProxy(int port, URI uri) throws Exception {
-    listenPort = port;
+  public void open(URI uri) throws Exception {
     target = uri;
-    open();
+    proxyUrl = urlFromSocket(target, serverSocket);
+    doOpen();
   }
   
   public String toString() {
@@ -91,18 +107,8 @@ public class SocketProxy {
     target = tcpBrokerUri;
   }
   
-  public void open() throws Exception {
-    serverSocket = createServerSocket(target);
-    serverSocket.setReuseAddress(true);
-    if (receiveBufferSize > 0) {
-      serverSocket.setReceiveBufferSize(receiveBufferSize);
-    }
-    if (proxyUrl == null) {
-      serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
-      proxyUrl = urlFromSocket(target, serverSocket);
-    } else {
-      serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
-    }
+  private void doOpen() throws Exception {
+    
     acceptor = new Acceptor(serverSocket, target);
     if (pauseAtStart) {
       acceptor.pause();
@@ -112,19 +118,19 @@ public class SocketProxy {
     closed = new CountDownLatch(1);
   }
   
-  private boolean isSsl(URI target) {
-    return "ssl".equals(target.getScheme());
+  public int getListenPort() {
+    return listenPort;
   }
   
-  private ServerSocket createServerSocket(URI target) throws Exception {
-    if (isSsl(target)) {
+  private ServerSocket createServerSocket(boolean useSSL) throws Exception {
+    if (useSSL) {
       return SSLServerSocketFactory.getDefault().createServerSocket();
     }
     return new ServerSocket();
   }
   
-  private Socket createSocket(URI target) throws Exception {
-    if (isSsl(target)) {
+  private Socket createSocket(boolean useSSL) throws Exception {
+    if (useSSL) {
       return SSLSocketFactory.getDefault().createSocket();
     }
     return new Socket();
@@ -175,7 +181,16 @@ public class SocketProxy {
   public void reopen() {
     log.info("Re-opening connectivity to "+getUrl());
     try {
-      open();
+      if (proxyUrl == null) {
+        throw new IllegalStateException("Can not call open before open(URI uri).");
+      }
+      serverSocket = createServerSocket(usesSSL);
+      serverSocket.setReuseAddress(true);
+      if (receiveBufferSize > 0) {
+        serverSocket.setReceiveBufferSize(receiveBufferSize);
+      }
+      serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
+      doOpen();
     } catch (Exception e) {
       log.debug("exception on reopen url:" + getUrl(), e);
     }
@@ -257,7 +272,7 @@ public class SocketProxy {
     
     public Bridge(Socket socket, URI target) throws Exception {
       receiveSocket = socket;
-      sendSocket = createSocket(target);
+      sendSocket = createSocket(usesSSL);
       if (receiveBufferSize > 0) {
         sendSocket.setReceiveBufferSize(receiveBufferSize);
       }
@@ -291,9 +306,9 @@ public class SocketProxy {
     }
     
     private void linkWithThreads(Socket source, Socket dest) {
-      requestThread = new Pump(source, dest);
+      requestThread = new Pump("Request", source, dest);
       requestThread.start();
-      responseThread = new Pump(dest, source);
+      responseThread = new Pump("Response", dest, source);
       responseThread.start();
     }
     
@@ -303,8 +318,8 @@ public class SocketProxy {
       private Socket destination;
       private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
       
-      public Pump(Socket source, Socket dest) {
-        super("SocketProxy-DataTransfer-" + source.getPort() + ":"
+      public Pump(String kind, Socket source, Socket dest) {
+        super("SocketProxy-"+kind+"-" + source.getPort() + ":"
             + dest.getPort());
         src = source;
         destination = dest;
@@ -321,17 +336,34 @@ public class SocketProxy {
       
       public void run() {
         byte[] buf = new byte[1024];
+
+        try {
+          src.setSoTimeout(10 * 1000);
+        } catch (SocketException e) {
+          log.error("Failed to set socket timeout on "+src+" due to: "+e);
+          throw new RuntimeException(e);
+        }
+
+        InputStream in = null;
+        OutputStream out = null;
         try {
-          InputStream in = src.getInputStream();
-          OutputStream out = destination.getOutputStream();
+          in = src.getInputStream();
+          out = destination.getOutputStream();
           while (true) {
-            int len = in.read(buf);
+            int len = -1;
+            try {
+              len = in.read(buf);
+            } catch (SocketTimeoutException ste) {
+              log.warn(ste+" when reading from "+src);
+            }
+
             if (len == -1) {
               log.debug("read eof from:" + src);
               break;
             }
             pause.get().await();
-            out.write(buf, 0, len);
+            if (len > 0)
+              out.write(buf, 0, len);
           }
         } catch (Exception e) {
           log.debug("read/write failed, reason: " + e.getLocalizedMessage());
@@ -342,6 +374,21 @@ public class SocketProxy {
               close();
             }
           } catch (Exception ignore) {}
+        } finally {
+          if (in != null) {
+            try {
+              in.close();
+            } catch (Exception exc) {
+              log.debug(exc+" when closing InputStream on socket: "+src);
+            }
+          }
+          if (out != null) {
+            try {
+              out.close();
+            } catch (Exception exc) {
+              log.debug(exc+" when closing OutputStream on socket: "+destination);
+            }
+          }
         }
       }
     }



Mime
View raw message