Return-Path: X-Original-To: apmail-lucene-commits-archive@www.apache.org Delivered-To: apmail-lucene-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1FB3F106BE for ; Sat, 3 Jan 2015 00:20:49 +0000 (UTC) Received: (qmail 30238 invoked by uid 500); 3 Jan 2015 00:20:49 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 30228 invoked by uid 99); 3 Jan 2015 00:20:49 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Jan 2015 00:20:49 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 41B11AC04B2; Sat, 3 Jan 2015 00:20:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1649154 - in /lucene/dev/trunk/solr: ./ core/src/test/org/apache/solr/cloud/ test-framework/src/java/org/apache/solr/cloud/ Date: Sat, 03 Jan 2015 00:20:47 -0000 To: commits@lucene.apache.org From: thelabdude@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150103002048.41B11AC04B2@hades.apache.org> Author: thelabdude Date: Sat Jan 3 00:20:46 2015 New Revision: 1649154 URL: http://svn.apache.org/r1649154 Log: SOLR-6874: There is a race around SocketProxy binding to it's port the way we setup JettySolrRunner and SocketProxy. Modified: lucene/dev/trunk/solr/CHANGES.txt lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java Modified: lucene/dev/trunk/solr/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1649154&r1=1649153&r2=1649154&view=diff ============================================================================== --- lucene/dev/trunk/solr/CHANGES.txt (original) +++ lucene/dev/trunk/solr/CHANGES.txt Sat Jan 3 00:20:46 2015 @@ -381,6 +381,9 @@ Bug Fixes * SOLR-6779: fix /browse for schemaless example (ehatcher) +* SOLR-6874: There is a race around SocketProxy binding to it's port the way we setup + JettySolrRunner and SocketProxy. (Mark Miller, Timothy Potter) + Optimizations ---------------------- Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java?rev=1649154&r1=1649153&r2=1649154&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java (original) +++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java Sat Jan 3 00:20:46 2015 @@ -55,9 +55,7 @@ public class ReplicationFactorTest exten private static final transient Logger log = LoggerFactory.getLogger(ReplicationFactorTest.class); - - private Map proxies = new HashMap(); - + public ReplicationFactorTest() { super(); sliceCount = 3; @@ -103,25 +101,8 @@ public class ReplicationFactorTest exten public JettySolrRunner createJetty(File solrHome, String dataDir, String shardList, String solrConfigOverride, String schemaOverride) throws Exception { - - JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context, - 0, solrConfigOverride, schemaOverride, false, - getExtraServlets(), sslConfig, getExtraRequestFilters()); - jetty.setShards(shardList); - jetty.setDataDir(getDataDir(dataDir)); - - // setup to proxy Http requests to this server unless it is the control - // server - int proxyPort = getNextAvailablePort(); - jetty.setProxyPort(proxyPort); - - jetty.start(); - - // create a socket proxy for the jetty server ... - SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI()); - proxies.put(proxy.getUrl(), proxy); - - return jetty; + + return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride); } protected int getNextAvailablePort() throws Exception { @@ -320,21 +301,7 @@ public class ReplicationFactorTest exten Thread.sleep(2000); ensureAllReplicasAreActive(testCollectionName, shardId, numShards, replicationFactor, 30); } - - protected SocketProxy getProxyForReplica(Replica replica) throws Exception { - String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); - assertNotNull(replicaBaseUrl); - URL baseUrl = new URL(replicaBaseUrl); - - SocketProxy proxy = proxies.get(baseUrl.toURI()); - if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) { - baseUrl = new URL(baseUrl.toExternalForm() + "/"); - proxy = proxies.get(baseUrl.toURI()); - } - assertNotNull("No proxy found for " + baseUrl + "!", proxy); - return proxy; - } - + protected int sendDoc(int docId, int minRf) throws Exception { UpdateRequest up = new UpdateRequest(); up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf)); Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1649154&r1=1649153&r2=1649154&view=diff ============================================================================== --- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original) +++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Sat Jan 3 00:20:46 2015 @@ -536,16 +536,11 @@ public abstract class AbstractFullDistri jetty.setShards(shardList); jetty.setDataDir(getDataDir(dataDir)); - // setup to proxy Http requests to this server unless it is the control - // server - int proxyPort = getNextAvailablePort(); - jetty.setProxyPort(proxyPort); + SocketProxy proxy = new SocketProxy(0, sslConfig == null ? false : sslConfig.isSSLMode()); + jetty.setProxyPort(proxy.getListenPort()); jetty.start(); - - // create a socket proxy for the jetty server ... - SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI()); + proxy.open(jetty.getBaseUrl().toURI()); proxies.put(proxy.getUrl(), proxy); - return jetty; } Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java?rev=1649154&r1=1649153&r2=1649154&view=diff ============================================================================== --- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java (original) +++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java Sat Jan 3 00:20:46 2015 @@ -59,24 +59,40 @@ public class SocketProxy { public List connections = new LinkedList(); - private int listenPort = 0; + private final int listenPort; private int receiveBufferSize = -1; private boolean pauseAtStart = false; private int acceptBacklog = 50; + + private boolean usesSSL; + + public SocketProxy() throws Exception { + this(0, false); + } - public SocketProxy() throws Exception {} + public SocketProxy( boolean useSSL) throws Exception { + this(0, useSSL); + } - public SocketProxy(URI uri) throws Exception { - this(0, uri); + public SocketProxy(int port, boolean useSSL) throws Exception { + int listenPort = port; + this.usesSSL = useSSL; + serverSocket = createServerSocket(useSSL); + serverSocket.setReuseAddress(true); + if (receiveBufferSize > 0) { + serverSocket.setReceiveBufferSize(receiveBufferSize); + } + serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog); + this.listenPort = serverSocket.getLocalPort(); } - public SocketProxy(int port, URI uri) throws Exception { - listenPort = port; + public void open(URI uri) throws Exception { target = uri; - open(); + proxyUrl = urlFromSocket(target, serverSocket); + doOpen(); } public String toString() { @@ -91,18 +107,8 @@ public class SocketProxy { target = tcpBrokerUri; } - public void open() throws Exception { - serverSocket = createServerSocket(target); - serverSocket.setReuseAddress(true); - if (receiveBufferSize > 0) { - serverSocket.setReceiveBufferSize(receiveBufferSize); - } - if (proxyUrl == null) { - serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog); - proxyUrl = urlFromSocket(target, serverSocket); - } else { - serverSocket.bind(new InetSocketAddress(proxyUrl.getPort())); - } + private void doOpen() throws Exception { + acceptor = new Acceptor(serverSocket, target); if (pauseAtStart) { acceptor.pause(); @@ -112,19 +118,19 @@ public class SocketProxy { closed = new CountDownLatch(1); } - private boolean isSsl(URI target) { - return "ssl".equals(target.getScheme()); + public int getListenPort() { + return listenPort; } - private ServerSocket createServerSocket(URI target) throws Exception { - if (isSsl(target)) { + private ServerSocket createServerSocket(boolean useSSL) throws Exception { + if (useSSL) { return SSLServerSocketFactory.getDefault().createServerSocket(); } return new ServerSocket(); } - private Socket createSocket(URI target) throws Exception { - if (isSsl(target)) { + private Socket createSocket(boolean useSSL) throws Exception { + if (useSSL) { return SSLSocketFactory.getDefault().createSocket(); } return new Socket(); @@ -175,7 +181,16 @@ public class SocketProxy { public void reopen() { log.info("Re-opening connectivity to "+getUrl()); try { - open(); + if (proxyUrl == null) { + throw new IllegalStateException("Can not call open before open(URI uri)."); + } + serverSocket = createServerSocket(usesSSL); + serverSocket.setReuseAddress(true); + if (receiveBufferSize > 0) { + serverSocket.setReceiveBufferSize(receiveBufferSize); + } + serverSocket.bind(new InetSocketAddress(proxyUrl.getPort())); + doOpen(); } catch (Exception e) { log.debug("exception on reopen url:" + getUrl(), e); } @@ -257,7 +272,7 @@ public class SocketProxy { public Bridge(Socket socket, URI target) throws Exception { receiveSocket = socket; - sendSocket = createSocket(target); + sendSocket = createSocket(usesSSL); if (receiveBufferSize > 0) { sendSocket.setReceiveBufferSize(receiveBufferSize); } @@ -291,9 +306,9 @@ public class SocketProxy { } private void linkWithThreads(Socket source, Socket dest) { - requestThread = new Pump(source, dest); + requestThread = new Pump("Request", source, dest); requestThread.start(); - responseThread = new Pump(dest, source); + responseThread = new Pump("Response", dest, source); responseThread.start(); } @@ -303,8 +318,8 @@ public class SocketProxy { private Socket destination; private AtomicReference pause = new AtomicReference(); - public Pump(Socket source, Socket dest) { - super("SocketProxy-DataTransfer-" + source.getPort() + ":" + public Pump(String kind, Socket source, Socket dest) { + super("SocketProxy-"+kind+"-" + source.getPort() + ":" + dest.getPort()); src = source; destination = dest; @@ -321,17 +336,34 @@ public class SocketProxy { public void run() { byte[] buf = new byte[1024]; + + try { + src.setSoTimeout(10 * 1000); + } catch (SocketException e) { + log.error("Failed to set socket timeout on "+src+" due to: "+e); + throw new RuntimeException(e); + } + + InputStream in = null; + OutputStream out = null; try { - InputStream in = src.getInputStream(); - OutputStream out = destination.getOutputStream(); + in = src.getInputStream(); + out = destination.getOutputStream(); while (true) { - int len = in.read(buf); + int len = -1; + try { + len = in.read(buf); + } catch (SocketTimeoutException ste) { + log.warn(ste+" when reading from "+src); + } + if (len == -1) { log.debug("read eof from:" + src); break; } pause.get().await(); - out.write(buf, 0, len); + if (len > 0) + out.write(buf, 0, len); } } catch (Exception e) { log.debug("read/write failed, reason: " + e.getLocalizedMessage()); @@ -342,6 +374,21 @@ public class SocketProxy { close(); } } catch (Exception ignore) {} + } finally { + if (in != null) { + try { + in.close(); + } catch (Exception exc) { + log.debug(exc+" when closing InputStream on socket: "+src); + } + } + if (out != null) { + try { + out.close(); + } catch (Exception exc) { + log.debug(exc+" when closing OutputStream on socket: "+destination); + } + } } } }