Return-Path: Delivered-To: apmail-lucene-solr-commits-archive@minotaur.apache.org Received: (qmail 42618 invoked from network); 12 Jan 2010 01:21:31 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 12 Jan 2010 01:21:31 -0000 Received: (qmail 31612 invoked by uid 500); 12 Jan 2010 01:21:30 -0000 Delivered-To: apmail-lucene-solr-commits-archive@lucene.apache.org Received: (qmail 31540 invoked by uid 500); 12 Jan 2010 01:21:30 -0000 Mailing-List: contact solr-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: solr-dev@lucene.apache.org Delivered-To: mailing list solr-commits@lucene.apache.org Received: (qmail 31531 invoked by uid 99); 12 Jan 2010 01:21:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jan 2010 01:21:30 +0000 X-ASF-Spam-Status: No, hits=-1997.4 required=10.0 tests=ALL_TRUSTED,FB_GET_MEDS,WEIRD_PORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jan 2010 01:21:18 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F331B2388906; Tue, 12 Jan 2010 01:20:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r898144 - in /lucene/solr/branches/cloud/src: java/org/apache/solr/handler/component/ solrj/org/apache/solr/client/solrj/impl/ test/org/apache/solr/ test/org/apache/solr/client/solrj/ test/org/apache/solr/cloud/ Date: Tue, 12 Jan 2010 01:20:55 -0000 To: solr-commits@lucene.apache.org From: yonik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100112012055.F331B2388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yonik Date: Tue Jan 12 01:20:54 2010 New Revision: 898144 URL: http://svn.apache.org/viewvc?rev=898144&view=rev Log: SOLR-1698: load balanced distrib search Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=898144&r1=898143&r2=898144&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java (original) +++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java Tue Jan 12 01:20:54 2010 @@ -21,6 +21,7 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.RTimer; import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ShardParams; @@ -33,6 +34,7 @@ import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; import org.apache.solr.client.solrj.impl.BinaryResponseParser; +import org.apache.solr.client.solrj.impl.LBHttpSolrServer; import org.apache.solr.util.plugin.SolrCoreAware; import org.apache.solr.core.SolrCore; import org.apache.lucene.queryParser.ParseException; @@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.*; +import java.net.MalformedURLException; /** * @@ -353,6 +356,8 @@ static HttpClient client; + static Random r = new Random(); + static LBHttpSolrServer loadbalancer; static { MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager(); @@ -361,12 +366,23 @@ mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout); mgr.getParams().setSoTimeout(SearchHandler.soTimeout); // mgr.getParams().setStaleCheckingEnabled(false); - client = new HttpClient(mgr); + client = new HttpClient(mgr); + try { + loadbalancer = new LBHttpSolrServer(client); + } catch (MalformedURLException e) { + // should be impossible since we're not passing any URLs here + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,e); + } } CompletionService completionService = new ExecutorCompletionService(commExecutor); Set> pending = new HashSet>(); + // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574") + // This is primarily to keep track of what order we should use to query the replicas of a shard + // so that we use the same replica for all phases of a distributed request. + Map> shardToURLs = new HashMap>(); + HttpCommComponent() { } @@ -390,7 +406,36 @@ } } + + // Not thread safe... don't use in Callable. + // Don't modify the returned URL list. + private List getURLs(String shard) { + List urls = shardToURLs.get(shard); + if (urls==null) { + urls = StrUtils.splitSmart(shard,"|",true); + + // convert shard to URL + for (int i=0; i 1) + Collections.shuffle(urls, r); + shardToURLs.put(shard, urls); + } + return urls; + } + + void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) { + // do this outside of the callable for thread safety reasons + final List urls = getURLs(shard); + Callable task = new Callable() { public ShardResponse call() throws Exception { @@ -402,13 +447,9 @@ long startTime = System.currentTimeMillis(); try { - // String url = "http://" + shard + "/select"; - String url = "http://" + shard; - params.remove(CommonParams.WT); // use default (currently javabin) params.remove(CommonParams.VERSION); - SolrServer server = new CommonsHttpSolrServer(url, client); // SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select"); // use generic request to avoid extra processing of queries QueryRequest req = new QueryRequest(params); @@ -416,10 +457,17 @@ // no need to set the response parser as binary is the default // req.setResponseParser(new BinaryResponseParser()); - // srsp.rsp = server.request(req); - // srsp.rsp = server.query(sreq.params); - ssr.nl = server.request(req); + if (urls.size() <= 1) { + String url = urls.get(0); + srsp.setShardAddress(url); + SolrServer server = new CommonsHttpSolrServer(url, client); + ssr.nl = server.request(req); + } else { + LBHttpSolrServer.Rsp rsp = loadbalancer.request(new LBHttpSolrServer.Req(req, urls)); + ssr.nl = rsp.getResponse(); + srsp.setShardAddress(rsp.getServer()); + } } catch (Throwable th) { srsp.setException(th); if (th instanceof SolrException) { Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java?rev=898144&r1=898143&r2=898144&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java (original) +++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java Tue Jan 12 01:20:54 2010 @@ -20,63 +20,68 @@ import org.apache.solr.common.SolrException; public final class ShardResponse { - private ShardRequest req; - private String shard; - private String shardAddress; // the specific shard that this response was received from - private int rspCode; - private Throwable exception; - private SolrResponse rsp; - - public String toString() { - return "ShardResponse:{shard="+shard+",shardAddress="+shardAddress - +"\n\trequest=" + req - +"\n\tresponse=" + rsp - + (exception==null ? "" : "\n\texception="+ SolrException.toStr(exception)) - +"\n}"; - } - - public Throwable getException() - { - return exception; - } - - public ShardRequest getShardRequest() - { - return req; - } - - public SolrResponse getSolrResponse() - { - return rsp; - } - - public String getShard() - { - return shard; - } - - void setShardRequest(ShardRequest rsp) - { - this.req = rsp; - } - - void setSolrResponse(SolrResponse rsp) - { - this.rsp = rsp; - } - - void setShard(String shard) - { - this.shard = shard; - } - - void setException(Throwable exception) - { - this.exception = exception; - } - - void setResponseCode(int rspCode) - { - this.rspCode = rspCode; - } + private ShardRequest req; + private String shard; + private String shardAddress; // the specific shard that this response was received from + private int rspCode; + private Throwable exception; + private SolrResponse rsp; + + public String toString() { + return "ShardResponse:{shard="+shard+",shardAddress="+shardAddress + +"\n\trequest=" + req + +"\n\tresponse=" + rsp + + (exception==null ? "" : "\n\texception="+ SolrException.toStr(exception)) + +"\n}"; + } + + public Throwable getException() + { + return exception; + } + + public ShardRequest getShardRequest() + { + return req; + } + + public SolrResponse getSolrResponse() + { + return rsp; + } + + public String getShard() + { + return shard; + } + + void setShardRequest(ShardRequest rsp) + { + this.req = rsp; + } + + void setSolrResponse(SolrResponse rsp) + { + this.rsp = rsp; + } + + void setShard(String shard) + { + this.shard = shard; + } + + void setException(Throwable exception) + { + this.exception = exception; + } + + void setResponseCode(int rspCode) + { + this.rspCode = rspCode; + } + + /** What was the shard address that returned this response. Example: "http://localhost:8983/solr" */ + public String getShardAddress() { return this.shardAddress; } + + void setShardAddress(String addr) { this.shardAddress = addr; } } Modified: lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=898144&r1=898143&r2=898144&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original) +++ lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Tue Jan 12 01:20:54 2010 @@ -27,26 +27,24 @@ import java.lang.ref.WeakReference; import java.net.MalformedURLException; import java.net.URL; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.*; /** - * LBHttpSolrServer or "LoadBalanced HttpSolrServer" is just a wrapper to CommonsHttpSolrServer. This is useful when you + * LBHttpSolrServer or "LoadBalanced HttpSolrServer" is a load balancing wrapper to CommonsHttpSolrServer. This is useful when you * have multiple SolrServers and the requests need to be Load Balanced among them. This should NOT be used for * indexing. Also see the wiki page. *

* It offers automatic failover when a server goes down and it detects when the server comes back up. *

- * Load balancing is done using a simple roundrobin on the list of servers. + * Load balancing is done using a simple round-robin on the list of servers. *

* If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken * off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server. * This process is continued till it tries all the live servers. If atleast one server is alive, the request succeeds, - * andif not it fails. + * and if not it fails. *

  * SolrServer lbHttpSolrServer = new LBHttpSolrServer("http://host1:8080/solr/","http://host2:8080/solr","http://host2:8080/solr");
  * //or if you wish to pass the HttpClient do as follows
@@ -57,23 +55,33 @@
  * This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute.
  * 

* When to use this?
This can be used as a software load balancer when you do not wish to setup an external - * load balancer. The code is relatively new and the API is currently experimental. Alternatives to this code are to use + * load balancer. Alternatives to this code are to use * a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See Load balancing on Wikipedia * - * @version $Id$ * @since solr 1.4 */ public class LBHttpSolrServer extends SolrServer { - private final CopyOnWriteArrayList aliveServers = new CopyOnWriteArrayList(); - private final CopyOnWriteArrayList zombieServers = new CopyOnWriteArrayList(); + + + // keys to the maps are currently of the form "http://localhost:8983/solr" + // which should be equivalent to CommonsHttpSolrServer.getBaseURL() + private final Map aliveServers = new LinkedHashMap(); + // access to aliveServers should be synchronized on itself + + private final Map zombieServers = new ConcurrentHashMap(); + + // changes to aliveServers are reflected in this array, no need to synchronize + private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0]; + + private ScheduledExecutorService aliveCheckExecutor; private HttpClient httpClient; private final AtomicInteger counter = new AtomicInteger(-1); - private ReentrantLock checkLock = new ReentrantLock(); private static final SolrQuery solrQuery = new SolrQuery("*:*"); + private static final BinaryResponseParser binaryParser = new BinaryResponseParser(); static { solrQuery.setRows(0); @@ -82,8 +90,13 @@ private static class ServerWrapper { final CommonsHttpSolrServer solrServer; - // Used only by the thread in aliveCheckExecutor - long lastUsed, lastChecked; + long lastUsed; // last time used for a real request + long lastChecked; // last time checked for liveness + + // "standard" servers are used by default. They normally live in the alive list + // and move to the zombie list when unavailable. Wne they become available again, + // they move back to the alive list. + boolean standard = true; int failedPings = 0; @@ -94,33 +107,220 @@ public String toString() { return solrServer.getBaseURL(); } + + public String getKey() { + return solrServer.getBaseURL(); + } + + @Override + public int hashCode() { + return this.getKey().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof ServerWrapper)) return false; + return this.getKey().equals(((ServerWrapper)obj).getKey()); + } + } + + public static class Req { + protected SolrRequest request; + protected List servers; + protected int numDeadServersToTry; + + public Req(SolrRequest request, List servers) { + this.request = request; + this.servers = servers; + this.numDeadServersToTry = servers.size(); + } + + public SolrRequest getRequest() { + return request; + } + public List getServers() { + return servers; + } + + /** @return the number of dead servers to try if there are no liver servers left */ + public int getNumDeadServersToTry() { + return numDeadServersToTry; + } + + /** @return The number of dead servers to try if there are no liver servers left. + * Defaults to the number of servers in this request. */ + public void setNumDeadServersToTry(int numDeadServersToTry) { + this.numDeadServersToTry = numDeadServersToTry; + } + } + + public static class Rsp { + protected String server; + protected NamedList rsp; + + /** The response from the server */ + public NamedList getResponse() { + return rsp; + } + + /** The server that returned the response */ + public String getServer() { + return server; + } } public LBHttpSolrServer(String... solrServerUrls) throws MalformedURLException { this(new HttpClient(new MultiThreadedHttpConnectionManager()), solrServerUrls); } + /** The provided httpClient should use a multi-threaded connection manager */ public LBHttpSolrServer(HttpClient httpClient, String... solrServerUrl) throws MalformedURLException { this(httpClient, new BinaryResponseParser(), solrServerUrl); } + /** The provided httpClient should use a multi-threaded connection manager */ public LBHttpSolrServer(HttpClient httpClient, ResponseParser parser, String... solrServerUrl) throws MalformedURLException { this.httpClient = httpClient; for (String s : solrServerUrl) { - aliveServers.add(new ServerWrapper(new CommonsHttpSolrServer(s, httpClient, parser))); + ServerWrapper wrapper = new ServerWrapper(new CommonsHttpSolrServer(s, httpClient, parser)); + aliveServers.put(wrapper.getKey(), wrapper); + } + updateAliveList(); + } + + public static String normalize(String server) { + if (server.endsWith("/")) + server = server.substring(0, server.length() - 1); + return server; + } + + protected CommonsHttpSolrServer makeServer(String server) throws MalformedURLException { + return new CommonsHttpSolrServer(server, httpClient, binaryParser); + } + + + + /** + * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped. + * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of + * time, or until a test request on that server succeeds. + * + * Servers are queried in the exact order given (except servers currently in the dead pool are skipped). + * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried. + * Req.getNumDeadServersToTry() controls how many dead servers will be tried. + * + * If no live servers are found a SolrServerException is thrown. + * + * @param req contains both the request as well as the list of servers to query + * + * @return the result of the request + * + * @throws SolrServerException + * @throws IOException + */ + public Rsp request(Req req) throws SolrServerException, IOException { + Rsp rsp = new Rsp(); + Exception ex = null; + + List skipped = new ArrayList(req.getNumDeadServersToTry()); + + for (String serverStr : req.getServers()) { + serverStr = normalize(serverStr); + // if the server is currently a zombie, just skip to the next one + ServerWrapper wrapper = zombieServers.get(serverStr); + if (wrapper != null) { + // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr); + if (skipped.size() < req.getNumDeadServersToTry()) + skipped.add(wrapper); + continue; + } + rsp.server = serverStr; + CommonsHttpSolrServer server = makeServer(serverStr); + + try { + rsp.rsp = server.request(req.getRequest()); + return rsp; // SUCCESS + } catch (SolrException e) { + // Server is alive but the request was malformed or invalid + throw e; + } catch (SolrServerException e) { + if (e.getRootCause() instanceof IOException) { + ex = e; + wrapper = new ServerWrapper(server); + wrapper.lastUsed = System.currentTimeMillis(); + wrapper.standard = false; + zombieServers.put(wrapper.getKey(), wrapper); + startAliveCheckExecutor(); + } else { + throw e; + } + } catch (Exception e) { + throw new SolrServerException(e); + } + } + + // try the servers we previously skipped + for (ServerWrapper wrapper : skipped) { + try { + rsp.rsp = wrapper.solrServer.request(req.getRequest()); + zombieServers.remove(wrapper.getKey()); + return rsp; // SUCCESS + } catch (SolrException e) { + // Server is alive but the request was malformed or invalid + zombieServers.remove(wrapper.getKey()); + throw e; + } catch (SolrServerException e) { + if (e.getRootCause() instanceof IOException) { + ex = e; + // already a zombie, no need to re-add + } else { + throw e; + } + } catch (Exception e) { + throw new SolrServerException(e); + } + } + + + if (ex == null) { + throw new SolrServerException("No live SolrServers available to handle this request"); + } else { + throw new SolrServerException("No live SolrServers available to handle this request", ex); + } + + } + + + + private void updateAliveList() { + synchronized (aliveServers) { + aliveServerList = aliveServers.values().toArray(new ServerWrapper[aliveServers.size()]); + } + } + + private ServerWrapper removeFromAlive(String key) { + synchronized (aliveServers) { + ServerWrapper wrapper = aliveServers.remove(key); + if (wrapper != null) + updateAliveList(); + return wrapper; + } + } + + private void addToAlive(ServerWrapper wrapper) { + synchronized (aliveServers) { + ServerWrapper prev = aliveServers.put(wrapper.getKey(), wrapper); + // TODO: warn if there was a previous entry? + updateAliveList(); } } public void addSolrServer(String server) throws MalformedURLException { CommonsHttpSolrServer solrServer = new CommonsHttpSolrServer(server, httpClient); - checkLock.lock(); - try { - aliveServers.add(new ServerWrapper(solrServer)); - } finally { - checkLock.unlock(); - } + addToAlive(new ServerWrapper(solrServer)); } public String removeSolrServer(String server) { @@ -132,25 +332,11 @@ if (server.endsWith("/")) { server = server.substring(0, server.length() - 1); } - this.checkLock.lock(); - try { - for (ServerWrapper serverWrapper : aliveServers) { - if (serverWrapper.solrServer.getBaseURL().equals(server)) { - aliveServers.remove(serverWrapper); - return serverWrapper.solrServer.getBaseURL(); - } - } - if (zombieServers.isEmpty()) return null; - for (ServerWrapper serverWrapper : zombieServers) { - if (serverWrapper.solrServer.getBaseURL().equals(server)) { - zombieServers.remove(serverWrapper); - return serverWrapper.solrServer.getBaseURL(); - } - } - } finally { - checkLock.unlock(); - } + // there is a small race condition here - if the server is in the process of being moved between + // lists, we could fail to remove it. + removeFromAlive(server); + zombieServers.remove(server); return null; } @@ -174,9 +360,10 @@ } /** - * Tries to query a live server. If no live servers are found it throws a SolrServerException. If the request failed - * due to IOException then the live server is moved to dead pool and the request is retried on another live server if - * available. If all live servers are exhausted then a SolrServerException is thrown. + * Tries to query a live server. A SolrServerException is thrown if all servers are dead. + * If the request failed due to IOException then the live server is moved to dead pool and the request is + * retried on another live server. After live servers are exhausted, any servers previously marked as dead + * will be tried before failing the request. * * @param request the SolrRequest. * @@ -187,41 +374,69 @@ */ public NamedList request(final SolrRequest request) throws SolrServerException, IOException { - int count = counter.incrementAndGet(); - int attempts = 0; - Exception ex; - int startSize = aliveServers.size(); - while (true) { - int size = aliveServers.size(); - if (size < 1) throw new SolrServerException("No live SolrServers available to handle this request"); - ServerWrapper solrServer; + Exception ex = null; + ServerWrapper[] serverList = aliveServerList; + + int maxTries = serverList.length; + Map justFailed = null; + + for (int attempts=0; attempts(); + justFailed.put(wrapper.getKey(), wrapper); + } else { + throw e; + } + } catch (Exception e) { + throw new SolrServerException(e); } + } + + + // try other standard servers that we didn't try just now + for (ServerWrapper wrapper : zombieServers.values()) { + if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getKey())) continue; try { - return solrServer.solrServer.request(request); + NamedList rsp = wrapper.solrServer.request(request); + // remove from zombie list *before* adding to alive to avoid a race that could lose a server + zombieServers.remove(wrapper.getKey()); + addToAlive(wrapper); + return rsp; } catch (SolrException e) { // Server is alive but the request was malformed or invalid throw e; } catch (SolrServerException e) { if (e.getRootCause() instanceof IOException) { ex = e; - moveAliveToDead(solrServer); + // still dead } else { throw e; } } catch (Exception e) { throw new SolrServerException(e); } - attempts++; - if (attempts >= startSize) - throw new SolrServerException("No live SolrServers available to handle this request", ex); } - } + + if (ex == null) { + throw new SolrServerException("No live SolrServers available to handle this request"); + } else { + throw new SolrServerException("No live SolrServers available to handle this request", ex); + } + } + /** * Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for * aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute @@ -230,39 +445,44 @@ */ private void checkAZombieServer(ServerWrapper zombieServer) { long currTime = System.currentTimeMillis(); - checkLock.lock(); try { zombieServer.lastChecked = currTime; QueryResponse resp = zombieServer.solrServer.query(solrQuery); if (resp.getStatus() == 0) { - //server has come back up - zombieServer.lastUsed = currTime; - zombieServers.remove(zombieServer); - aliveServers.add(zombieServer); - zombieServer.failedPings = 0; + // server has come back up. + // make sure to remove from zombies before adding to alive to avoid a race condition + // where another thread could mark it down, move it back to zombie, an then we delete + // from zombie and lose it forever. + ServerWrapper wrapper = zombieServers.remove(zombieServer.getKey()); + if (wrapper != null) { + wrapper.failedPings = 0; + if (wrapper.standard) { + addToAlive(wrapper); + } + } else { + // something else already moved the server from zombie to alive + } } } catch (Exception e) { + //Expected. The server is still down. zombieServer.failedPings++; - //Expected . The server is still down - } finally { - checkLock.unlock(); - } - } - private void moveAliveToDead(ServerWrapper solrServer) { - checkLock.lock(); - try { - boolean result = aliveServers.remove(solrServer); - if (result) { - if (zombieServers.addIfAbsent(solrServer)) { - startAliveCheckExecutor(); - } + // If the server doesn't belong in the standard set belonging to this load balancer + // then simply drop it after a certain number of failed pings. + if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) { + zombieServers.remove(zombieServer.getKey()); } - } finally { - checkLock.unlock(); } } + private void moveAliveToDead(ServerWrapper wrapper) { + wrapper = removeFromAlive(wrapper.getKey()); + if (wrapper == null) + return; // another thread already detected the failure and removed it + zombieServers.put(wrapper.getKey(), wrapper); + startAliveCheckExecutor(); + } + private int interval = CHECK_INTERVAL; /** @@ -280,6 +500,8 @@ } private void startAliveCheckExecutor() { + // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor + // if it's not null. if (aliveCheckExecutor == null) { synchronized (this) { if (aliveCheckExecutor == null) { @@ -292,13 +514,13 @@ } } - private static Runnable getAliveCheckRunner(final WeakReference lbHttpSolrServer) { + private static Runnable getAliveCheckRunner(final WeakReference lbRef) { return new Runnable() { public void run() { - LBHttpSolrServer solrServer = lbHttpSolrServer.get(); - if (solrServer != null && solrServer.zombieServers != null) { - for (ServerWrapper zombieServer : solrServer.zombieServers) { - solrServer.checkAZombieServer(zombieServer); + LBHttpSolrServer lb = lbRef.get(); + if (lb != null && lb.zombieServers != null) { + for (ServerWrapper zombieServer : lb.zombieServers.values()) { + lb.checkAZombieServer(zombieServer); } } } @@ -318,5 +540,7 @@ } } + // defaults private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks + private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list } Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java?rev=898144&r1=898143&r2=898144&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java (original) +++ lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java Tue Jan 12 01:20:54 2010 @@ -2,15 +2,7 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; +import java.util.*; import junit.framework.TestCase; @@ -50,6 +42,8 @@ protected List jettys = new ArrayList(); protected String context = "/solr"; protected String shards; + protected String[] shardsArr; + protected String[] deadServers = {"localhost:33331/solr","localhost:33332/solr"}; protected File testDir; protected SolrServer controlClient; protected int portSeed; @@ -150,18 +144,43 @@ controlJetty = createJetty(testDir, "control"); controlClient = createNewSolrServer(controlJetty.getLocalPort()); + shardsArr = new String[numShards]; StringBuilder sb = new StringBuilder(); - for (int i = 1; i <= numShards; i++) { + for (int i = 0; i < numShards; i++) { if (sb.length() > 0) sb.append(','); JettySolrRunner j = createJetty(testDir, "shard" + i); jettys.add(j); clients.add(createNewSolrServer(j.getLocalPort())); - sb.append("localhost:").append(j.getLocalPort()).append(context); + String shardStr = "localhost:" + j.getLocalPort() + context; + shardsArr[i] = shardStr; + sb.append(shardStr); } shards = sb.toString(); } + protected String getShardsString() { + if (deadServers == null) return shards; + + StringBuilder sb = new StringBuilder(); + for (String shard : shardsArr) { + if (sb.length() > 0) sb.append(','); + int nDeadServers = r.nextInt(deadServers.length+1); + if (nDeadServers > 0) { + List replicas = new ArrayList(Arrays.asList(deadServers)); + Collections.shuffle(replicas, r); + replicas.add(r.nextInt(nDeadServers+1), shard); + for (int i=0; i serverList = new LinkedList(); for (int i = 0; i < solr.length; i++) { - s[i] = solr[i].getUrl(); + serverList.add(solr[i].getUrl()); } - LBHttpSolrServer lbHttpSolrServer = new LBHttpSolrServer(httpClient, s); - lbHttpSolrServer.setAliveCheckInterval(500); + String[] servers = serverList.toArray(new String[serverList.size()]); + + LBHttpSolrServer lb = new LBHttpSolrServer(httpClient, servers); + lb.setAliveCheckInterval(500); + LBHttpSolrServer lb2 = new LBHttpSolrServer(httpClient); + lb2.setAliveCheckInterval(500); + + SolrQuery solrQuery = new SolrQuery("*:*"); + SolrRequest solrRequest = new QueryRequest(solrQuery); Set names = new HashSet(); QueryResponse resp = null; - for (String value : s) { - resp = lbHttpSolrServer.query(solrQuery); + for (String server : servers) { + resp = lb.query(solrQuery); + assertEquals(10, resp.getResults().getNumFound()); + names.add(resp.getResults().get(0).getFieldValue("name").toString()); + } + assertEquals(3, names.size()); + + // Now test through the advanced API + names.clear(); + for (String server : servers) { + LBHttpSolrServer.Rsp rsp = lb2.request(new LBHttpSolrServer.Req(solrRequest, serverList)); + // make sure the response came from the first in the list + assertEquals(rsp.getServer(), serverList.getFirst()); + resp = new QueryResponse(rsp.getResponse(), lb); assertEquals(10, resp.getResults().getNumFound()); names.add(resp.getResults().get(0).getFieldValue("name").toString()); + + // rotate the server list + serverList.addLast(serverList.removeFirst()); } assertEquals(3, names.size()); + // Kill a server and test again solr[1].jetty.stop(); solr[1].jetty = null; names.clear(); - for (String value : s) { - resp = lbHttpSolrServer.query(solrQuery); + for (String server : servers) { + resp = lb.query(solrQuery); assertEquals(10, resp.getResults().getNumFound()); names.add(resp.getResults().get(0).getFieldValue("name").toString()); } assertEquals(2, names.size()); assertFalse(names.contains("solr1")); + + // Now test through the advanced API + names.clear(); + for (String server : servers) { + LBHttpSolrServer.Rsp rsp = lb2.request(new LBHttpSolrServer.Req(solrRequest, serverList)); + resp = new QueryResponse(rsp.getResponse(), lb); + assertFalse(rsp.getServer().contains("solr1")); + assertEquals(10, resp.getResults().getNumFound()); + names.add(resp.getResults().get(0).getFieldValue("name").toString()); + + // rotate the server list + serverList.addLast(serverList.removeFirst()); + } + assertEquals(2, names.size()); + assertFalse(names.contains("solr1")); + // Start the killed server once again - solr[1].startJetty(); + solr[1].startJetty(true); // Wait for the alive check to complete - Thread.sleep(1200); + Thread.sleep(600); + names.clear(); + for (String value : servers) { + resp = lb.query(solrQuery); + assertEquals(10, resp.getResults().getNumFound()); + names.add(resp.getResults().get(0).getFieldValue("name").toString()); + } + // System.out.println("SERVERNAMES="+names); + assertEquals(3, names.size()); + + // Now test through the advanced API names.clear(); - for (String value : s) { - resp = lbHttpSolrServer.query(solrQuery); + for (String server : servers) { + LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList); + LBHttpSolrServer.Rsp rsp = lb2.request(req); + // make sure the response came from the first in the list + assertEquals(rsp.getServer(), serverList.getFirst()); + resp = new QueryResponse(rsp.getResponse(), lb); assertEquals(10, resp.getResults().getNumFound()); names.add(resp.getResults().get(0).getFieldValue("name").toString()); + + // rotate the server list + serverList.addLast(serverList.removeFirst()); } assertEquals(3, names.size()); + + + // slow LB for Simple API + LBHttpSolrServer slowLB = new LBHttpSolrServer(httpClient, servers); + slowLB.setAliveCheckInterval(1000000000); + + // slow LB for advanced API + LBHttpSolrServer slowLB2 = new LBHttpSolrServer(httpClient); + slowLB2.setAliveCheckInterval(1000000000); + + // stop all solr servers + for (SolrInstance solrInstance : solr) { + solrInstance.jetty.stop(); + solrInstance.jetty = null; + } + + try { + resp = slowLB.query(solrQuery); + TestCase.fail(); // all servers should be down + } catch (SolrServerException e) { + // expected + } + + try { + LBHttpSolrServer.Rsp rsp = slowLB2.request(new LBHttpSolrServer.Req(solrRequest, serverList)); + TestCase.fail(); // all servers should be down + } catch (SolrServerException e) { + // expected + } + + // Start the killed server once again + solr[1].startJetty(true); + + // even though one of the servers is now up, the loadbalancer won't yet know this unless we ask + // it to try dead servers. + try { + LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList); + req.setNumDeadServersToTry(0); + LBHttpSolrServer.Rsp rsp = slowLB2.request(req); + TestCase.fail(); // all servers still should be marked as down + } catch (SolrServerException e) { + // expected + } + + // the default is to try dead servers if there are no live servers + { + resp = slowLB.query(solrQuery); + } + + // the default is to try dead servers if there are no live servers + { + LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList); + LBHttpSolrServer.Rsp rsp = slowLB2.request(req); + } + + // the last success should have removed the server from the dead server list, so + // the next request should succeed even if it doesn't try dead servers. + { + LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList); + req.setNumDeadServersToTry(0); + LBHttpSolrServer.Rsp rsp = slowLB2.request(req); + } + } - public void testTwoServers() throws Exception { + // this test is a subset of testSimple and is no longer needed + public void XtestTwoServers() throws Exception { LBHttpSolrServer lbHttpSolrServer = new LBHttpSolrServer(httpClient, solr[0].getUrl(), solr[1].getUrl()); lbHttpSolrServer.setAliveCheckInterval(500); SolrQuery solrQuery = new SolrQuery("*:*"); @@ -135,15 +255,9 @@ Assert.assertEquals("solr1", name); solr[1].jetty.stop(); solr[1].jetty = null; - solr[0].startJetty(); - Thread.sleep(1200); - try { - resp = lbHttpSolrServer.query(solrQuery); - } catch(SolrServerException e) { - // try again after a pause in case the error is lack of time to start server - Thread.sleep(3000); - resp = lbHttpSolrServer.query(solrQuery); - } + solr[0].startJetty(true); + Thread.sleep(600); + resp = lbHttpSolrServer.query(solrQuery); name = resp.getResults().get(0).getFieldValue("name").toString(); Assert.assertEquals("solr0", name); } @@ -217,10 +331,15 @@ } public void startJetty() throws Exception { + startJetty(false); + } + + + public void startJetty(boolean waitUntilUp) throws Exception { jetty = new JettySolrRunner("/solr", port); System.setProperty("solr.solr.home", getHomeDir()); System.setProperty("solr.data.dir", getDataDir()); - jetty.start(); + jetty.start(waitUntilUp); int newPort = jetty.getLocalPort(); if (port != 0 && newPort != port) { TestCase.fail("TESTING FAILURE: could not grab requested port."); Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=898144&r1=898143&r2=898144&view=diff ============================================================================== --- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original) +++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Tue Jan 12 01:20:54 2010 @@ -36,6 +36,9 @@ // we don't call super.setUp log.info("####SETUP_START " + getName()); portSeed = 13000; + + // TODO: HACK: inserting dead servers doesn't currently work with these tests + deadServers = null; System.setProperty("zkHost", AbstractZkTestCase.ZOO_KEEPER_ADDRESS); String zkDir = tmpDir.getAbsolutePath() + File.separator