lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markrmil...@apache.org
Subject [07/26] lucene-solr:starburst: The Star Burst Upgrade - a work in progress - the branch gets replaced often.
Date Sun, 29 Jul 2018 15:08:29 GMT
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/74a9b54c/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
index edc7269..11e384a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
+import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,8 +30,6 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 
-import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
-
 
 public class ReplicaInfo implements MapWriter {
   private final String name;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/74a9b54c/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
index f1799c4..1b930b4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
@@ -17,6 +17,8 @@
 
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
+import static org.apache.solr.common.params.CoreAdminParams.NODE;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,8 +36,6 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
 
-import static org.apache.solr.common.params.CoreAdminParams.NODE;
-
 /**
  * Each instance represents a node in the cluster
  */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/74a9b54c/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
index eb0c63c..12ae411 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
@@ -17,6 +17,8 @@
 
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
+import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -40,8 +42,6 @@ import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
 
-import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
-
 /* A suggester is capable of suggesting a collection operation
  * given a particular session. Before it suggests a new operation,
  * it ensures that ,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/74a9b54c/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java
new file mode 100644
index 0000000..96a3f55
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java
@@ -0,0 +1,1081 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.ConnectException;
+import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient.Abortable;
+import org.apache.solr.client.solrj.impl.Http2SolrClient.OnComplete;
+import org.apache.solr.client.solrj.impl.Http2SolrClient.RemoteExecutionException;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.client.solrj.util.SolrInternalHttpClient;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.QoSParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.slf4j.MDC;
+
+/**
+ * LBHttpSolrClient or "LoadBalanced HttpSolrClient" is a load balancing wrapper around
+ * {@link HttpSolrClient}. This is useful when you
+ * have multiple Solr servers and the requests need to be Load Balanced among them.
+ *
+ * Do <b>NOT</b> use this class for indexing in master/slave scenarios since documents must be sent to the
+ * correct master; no inter-node routing is done.
+ *
+ * In SolrCloud (leader/replica) scenarios, it is usually better to use
+ * {@link CloudSolrClient}, but this class may be used
+ * for updates because the server will forward them to the appropriate leader.
+ *
+ * <p>
+ * It offers automatic failover when a server goes down and it detects when the server comes back up.
+ * <p>
+ * Load balancing is done using a simple round-robin on the list of servers.
+ * <p>
+ * If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken
+ * off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server.
+ * This process is continued till it tries all the live servers. If at least one server is alive, the request succeeds,
+ * and if not it fails.
+ * <blockquote><pre>
+ * SolrClient lbHttpSolrClient = new LBHttpSolrClient("http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");
+ * //or if you wish to pass the HttpClient do as follows
+ * httpClient httpClient = new HttpClient();
+ * SolrClient lbHttpSolrClient = new LBHttpSolrClient(httpClient, "http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");
+ * </pre></blockquote>
+ * This detects if a dead server comes alive automatically. The check is done in fixed intervals in a dedicated thread.
+ * This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute.
+ * <p>
+ * <b>When to use this?</b><br> This can be used as a software load balancer when you do not wish to setup an external
+ * load balancer. Alternatives to this code are to use
+ * a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a
+ * href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>
+ *
+ * @since solr 1.4
+ */
+public class AsyncLBHttpSolrClient extends SolrClient {
+  private static Set<Integer> RETRY_CODES = new HashSet<>(4);
+
+  static {
+    RETRY_CODES.add(404);
+    RETRY_CODES.add(403);
+    RETRY_CODES.add(503);
+    RETRY_CODES.add(500);
+  }
+
+  // keys to the maps are currently of the form "http://localhost:8983/solr"
+  // which should be equivalent to HttpSolrServer.getBaseURL()
+  private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
+  // access to aliveServers should be synchronized on itself
+  
+  protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
+
+  // changes to aliveServers are reflected in this array, no need to synchronize
+  private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
+  
+  //private Set<SolrClient> clients = new ConcurrentHashMap<>().newKeySet();
+
+  private ScheduledExecutorService aliveCheckExecutor;
+
+  private final SolrInternalHttpClient httpClient;
+  private final boolean clientIsInternal;
+  //private Http2SolrClient.Builder httpSolrClientBuilder;
+  private final AtomicInteger counter = new AtomicInteger(-1);
+
+  //private static final SolrQuery solrQuery = new SolrQuery("*:*");
+  private volatile ResponseParser parser;
+  private volatile RequestWriter requestWriter;
+
+  private Set<String> queryParams = new HashSet<>();
+  private Integer connectionTimeout;
+
+  private Integer soTimeout;
+
+  private Builder builder;
+
+  private Http2SolrClient solrClient;
+
+  static {
+
+  }
+
+  protected static class ServerWrapper {
+
+    private final Http2SolrClient solrClient;
+
+    // "standard" servers are used by default.  They normally live in the alive list
+    // and move to the zombie list when unavailable.  When they become available again,
+    // they move back to the alive list.
+    boolean standard = true;
+
+    int failedPings = 0;
+
+    private String baseUrl;
+
+    public ServerWrapper(Http2SolrClient solrClient, String baseUrl) {
+      this.solrClient = solrClient;
+      this.baseUrl = baseUrl;
+    }
+
+    @Override
+    public String toString() {
+      return baseUrl;
+    }
+
+    public String getKey() {
+      return baseUrl;
+    }
+
+    @Override
+    public int hashCode() {
+      return this.getKey().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (!(obj instanceof ServerWrapper)) return false;
+      return this.getKey().equals(((ServerWrapper)obj).getKey());
+    }
+
+    public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
+      request.setBasePath(baseUrl);
+      return solrClient.request(request, collection);
+    }
+  }
+
+  public static class Req {
+    protected SolrRequest request;
+    protected List<String> servers;
+    protected int numDeadServersToTry;
+    private final Integer numServersToTry;
+
+    public Req(SolrRequest request, List<String> servers) {
+      this(request, servers, null);
+    }
+
+    public Req(SolrRequest request, List<String> servers, Integer numServersToTry) {
+      this.request = request;
+      this.servers = servers;
+      this.numDeadServersToTry = servers.size();
+      this.numServersToTry = numServersToTry;
+    }
+
+    public SolrRequest getRequest() {
+      return request;
+    }
+    public List<String> getServers() {
+      return servers;
+    }
+
+    /** @return the number of dead servers to try if there are no live servers left */
+    public int getNumDeadServersToTry() {
+      return numDeadServersToTry;
+    }
+
+    /** @param numDeadServersToTry The number of dead servers to try if there are no live servers left.
+     * Defaults to the number of servers in this request. */
+    public void setNumDeadServersToTry(int numDeadServersToTry) {
+      this.numDeadServersToTry = numDeadServersToTry;
+    }
+
+    public Integer getNumServersToTry() {
+      return numServersToTry;
+    }
+  }
+
+  public static class Rsp {
+    protected String server;
+    protected NamedList<Object> rsp;
+    public Abortable areq;
+
+    /** The response from the server */
+    public NamedList<Object> getResponse() {
+      return rsp;
+    }
+
+    /** The server that returned the response */
+    public String getServer() {
+      return server;
+    }
+  }
+
+  /**
+   * The provided httpClient should use a multi-threaded connection manager
+   *
+   * @deprecated use {@link AsyncLBHttpSolrClient#AsyncLBHttpSolrClient(Builder, Http2SolrClient)} instead, as it is a more extension/subclassing-friendly alternative
+   */
+  @Deprecated
+  protected AsyncLBHttpSolrClient(Http2SolrClient.Builder httpSolrClientBuilder,
+      SolrInternalHttpClient httpClient, Http2SolrClient solrClient, String... solrServerUrl) {
+    this(new Builder()
+        .withHttpSolrClientBuilder(httpSolrClientBuilder)
+        .withHttpClient(httpClient)
+        .withBaseSolrUrls(solrServerUrl), solrClient);
+  }
+
+  /**
+   * The provided httpClient should use a multi-threaded connection manager
+   *
+   * @deprecated use {@link AsyncLBHttpSolrClient#AsyncLBHttpSolrClient(Builder, Http2SolrClient)} instead, as it is a more extension/subclassing-friendly alternative
+   */
+  @Deprecated
+  protected AsyncLBHttpSolrClient(SolrInternalHttpClient httpClient, ResponseParser parser, Http2SolrClient solrClient, String... solrServerUrl) {
+    this(new Builder()
+        .withBaseSolrUrls(solrServerUrl)
+        .withResponseParser(parser)
+        .withHttpClient(httpClient), solrClient);
+  }
+
+  protected AsyncLBHttpSolrClient(Builder builder, Http2SolrClient solrClient) {
+    assert solrClient != null;
+    this.solrClient = solrClient;
+    this.clientIsInternal = builder.httpClient == null;
+    //this.httpSolrClientBuilder = builder.httpSolrClientBuilder;
+    this.httpClient = builder.httpClient == null ? new SolrInternalHttpClient(getClass().getSimpleName()) : builder.httpClient;
+    this.connectionTimeout = builder.connectionTimeoutMillis;
+    this.soTimeout = builder.socketTimeoutMillis;    
+    this.parser = builder.responseParser;
+
+    if (! builder.baseSolrUrls.isEmpty()) {
+      for (String s : builder.baseSolrUrls) {
+        ServerWrapper wrapper = new ServerWrapper(makeSolrClient(s), s);
+        aliveServers.put(wrapper.getKey(), wrapper);
+      }
+    }
+    updateAliveList();
+    this.builder = builder;
+  }
+
+//  private HttpClient constructClient(String[] solrServerUrl) {
+//    ModifiableSolrParams params = new ModifiableSolrParams();
+//    if (solrServerUrl != null && solrServerUrl.length > 1) {
+//      // we prefer retrying another server
+//      params.set(HttpClientUtil.PROP_USE_RETRY, false);
+//    } else {
+//      params.set(HttpClientUtil.PROP_USE_RETRY, true);
+//    }
+//    return HttpClientUtil.createClient(params);
+//  }
+
+  public Set<String> getQueryParams() {
+    return queryParams;
+  }
+
+  /**
+   * Expert Method.
+   * @param queryParams set of param keys to only send via the query string
+   */
+  public void setQueryParams(Set<String> queryParams) {
+    this.queryParams = queryParams;
+  }
+  public void addQueryParams(String queryOnlyParam) {
+    this.queryParams.add(queryOnlyParam) ;
+  }
+
+  public static String normalize(String server) {
+    if (server.endsWith("/"))
+      server = server.substring(0, server.length() - 1);
+    return server;
+  }
+
+  protected Http2SolrClient makeSolrClient(String server) {
+//    HttpSolrClient client;
+//    if (httpSolrClientBuilder != null) {
+//      synchronized (this) {
+//        httpSolrClientBuilder
+//            .withBaseSolrUrl(server)
+//            .withHttpClient(httpClient);
+//        if (connectionTimeout != null) {
+//          httpSolrClientBuilder.withConnectionTimeout(connectionTimeout);
+//        }
+//        if (soTimeout != null) {
+//          httpSolrClientBuilder.withSocketTimeout(soTimeout);
+//        }
+//        client = httpSolrClientBuilder.build();
+//      }
+//    } else {
+//      final HttpSolrClient.Builder clientBuilder = new HttpSolrClient.Builder(server)
+//          .withHttpClient(httpClient)
+//          .withResponseParser(parser);
+//      if (connectionTimeout != null) {
+//        clientBuilder.withConnectionTimeout(connectionTimeout);
+//      }
+//      if (soTimeout != null) {
+//        clientBuilder.withSocketTimeout(soTimeout);
+//      }
+//      client = clientBuilder.build();
+//    }
+//    if (requestWriter != null) {
+//      client.setRequestWriter(requestWriter);
+//    }
+//    if (queryParams != null) {
+//      client.setQueryParams(queryParams);
+//    }
+    
+    return solrClient; 
+    
+  //  return solrClient;
+  }
+  
+  static class SolrClientWrapper {
+    private Http2SolrClient solrClient;
+
+    SolrClientWrapper(Http2SolrClient solrClient) {
+      this.solrClient = solrClient;
+    }
+  }
+
+  /**
+   * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
+   * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
+   * time, or until a test request on that server succeeds.
+   *
+   * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
+   * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
+   * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
+   *
+   * If no live servers are found a SolrServerException is thrown.
+   *
+   * @param req contains both the request as well as the list of servers to query
+   *
+   * @return the result of the request
+   *
+   * @throws IOException If there is a low-level I/O error.
+   */
+  public Rsp request(Req req) throws SolrServerException, IOException {
+    Rsp rsp = new Rsp();
+    Exception ex = null;
+    boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
+    List<ServerWrapper> skipped = null;
+
+    final Integer numServersToTry = req.getNumServersToTry();
+    int numServersTried = 0;
+
+    boolean timeAllowedExceeded = false;
+    long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
+    long timeOutTime = System.nanoTime() + timeAllowedNano;
+    for (String serverStr : req.getServers()) {
+      if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+        break;
+      }
+      
+      serverStr = normalize(serverStr);
+      // if the server is currently a zombie, just skip to the next one
+      ServerWrapper wrapper = zombieServers.get(serverStr);
+      if (wrapper != null) {
+        // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
+        final int numDeadServersToTry = req.getNumDeadServersToTry();
+        if (numDeadServersToTry > 0) {
+          if (skipped == null) {
+            skipped = new ArrayList<>(numDeadServersToTry);
+            skipped.add(wrapper);
+          }
+          else if (skipped.size() < numDeadServersToTry) {
+            skipped.add(wrapper);
+          }
+        }
+        continue;
+      }
+      try {
+        MDC.put("LBHttpSolrClient.url", serverStr);
+
+        if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+          break;
+        }
+
+        Http2SolrClient client = makeSolrClient(serverStr);
+        req.request.setBasePath(serverStr);
+
+        ++numServersTried;
+        ex = doRequest(client, req, rsp, isNonRetryable, false, null);
+        if (ex == null) {
+          return rsp; // SUCCESS
+        }
+      } finally {
+        MDC.remove("LBHttpSolrClient.url");
+      }
+    }
+
+    // try the servers we previously skipped
+    if (skipped != null) {
+      for (ServerWrapper wrapper : skipped) {
+        if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+          break;
+        }
+
+        if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+          break;
+        }
+
+        try {
+          MDC.put("LBHttpSolrClient.url", wrapper.baseUrl);
+          ++numServersTried;
+          req.request.setBasePath(wrapper.baseUrl);
+          ex = doRequest(wrapper.solrClient, req, rsp, isNonRetryable, true, wrapper.getKey());
+          if (ex == null) {
+            return rsp; // SUCCESS
+          }
+        } finally {
+          MDC.remove("LBHttpSolrClient.url");
+        }
+      }
+    }
+
+
+    final String solrServerExceptionMessage;
+    if (timeAllowedExceeded) {
+      solrServerExceptionMessage = "Time allowed to handle this request exceeded";
+    } else {
+      if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+        solrServerExceptionMessage = "No live SolrServers available to handle this request:"
+            + " numServersTried="+numServersTried
+            + " numServersToTry="+numServersToTry.intValue();
+      } else {
+        solrServerExceptionMessage = "No live SolrServers available to handle this request";
+      }
+    }
+    if (ex == null) {
+      throw new SolrServerException(solrServerExceptionMessage);
+    } else {
+      throw new SolrServerException(solrServerExceptionMessage+":" + zombieServers.keySet(), ex);
+    }
+
+  }
+
+  protected Exception addZombie(Http2SolrClient server, String url, Exception e) {
+
+    ServerWrapper wrapper;
+
+    wrapper = new ServerWrapper(server, url);
+    wrapper.standard = false;
+    zombieServers.put(wrapper.getKey(), wrapper);
+    startAliveCheckExecutor();
+    return e;
+  }  
+
+  protected Exception doRequest(Http2SolrClient client, Req req, Rsp rsp, boolean isNonRetryable,
+      boolean isZombie, String zombieKey) throws SolrServerException, IOException {
+    Exception ex = null;
+    try {
+      assert client != null;
+      assert req != null;
+      assert rsp != null;
+      rsp.server = req.request.getBasePath();
+     // rsp.rsp =
+      rsp.areq = client.abortableRequest(req.getRequest(), new OnComplete<SolrResponseBase>() {
+
+        @Override
+        public void onSuccess(SolrResponseBase result) {
+          // nocommit
+          
+          if (isZombie && result.getStatus() == 200) {
+            zombieServers.remove(zombieKey);
+          }
+          
+          rsp.rsp = result.getResponse();
+        }
+
+        @Override
+        public void onFailure(Throwable e) {
+          e.printStackTrace();
+        }});
+
+    } catch (RemoteExecutionException e){
+      throw e;
+    } catch(SolrException e) {
+      // we retry on 404 or 403 or 503 or 500
+      // unless it's an update - then we only retry on connect exception
+      if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
+        ex = (!isZombie) ? addZombie(client, rsp.server, e) : e;
+      } else {
+        // Server is alive but the request was likely malformed or invalid
+        if (isZombie) {
+          zombieServers.remove(zombieKey);
+        }
+        throw e;
+      }
+    } catch (SocketException e) {
+      if (!isNonRetryable || e instanceof ConnectException) {
+        ex = (!isZombie) ? addZombie(client, rsp.server, e) : e;
+      } else {
+        throw e;
+      }
+    } catch (SocketTimeoutException e) {
+      if (!isNonRetryable) {
+        ex = (!isZombie) ? addZombie(client, rsp.server, e) : e;
+      } else {
+        throw e;
+      }
+    } catch (SolrServerException e) {
+      Throwable rootCause = e.getRootCause();
+      if (!isNonRetryable && rootCause instanceof IOException) {
+        ex = (!isZombie) ? addZombie(client, rsp.server, e) : e;
+      } else if (isNonRetryable && rootCause instanceof ConnectException) {
+        ex = (!isZombie) ? addZombie(client, rsp.server, e) : e;
+      } else {
+        throw e;
+      }
+    } catch (Exception e) {
+      throw new SolrServerException(e);
+    }
+
+    return ex;
+  }
+
+  private void updateAliveList() {
+    synchronized (aliveServers) {
+      aliveServerList = aliveServers.values().toArray(new ServerWrapper[aliveServers.size()]);
+    }
+  }
+
+  private ServerWrapper removeFromAlive(String key) {
+    synchronized (aliveServers) {
+      ServerWrapper wrapper = aliveServers.remove(key);
+      if (wrapper != null)
+        updateAliveList();
+      return wrapper;
+    }
+  }
+
+  private void addToAlive(ServerWrapper wrapper) {
+    synchronized (aliveServers) {
+      ServerWrapper prev = aliveServers.put(wrapper.getKey(), wrapper);
+      // TODO: warn if there was a previous entry?
+      updateAliveList();
+    }
+  }
+
+  public void addSolrServer(String server) throws MalformedURLException {
+    Http2SolrClient client = makeSolrClient(server);
+    addToAlive(new ServerWrapper(client, server));
+  }
+
+  public String removeSolrServer(String server) {
+    try {
+      server = new URL(server).toExternalForm();
+    } catch (MalformedURLException e) {
+      throw new RuntimeException(e);
+    }
+    if (server.endsWith("/")) {
+      server = server.substring(0, server.length() - 1);
+    }
+
+    // there is a small race condition here - if the server is in the process of being moved between
+    // lists, we could fail to remove it.
+    removeFromAlive(server);
+    zombieServers.remove(server);
+    return null;
+  }
+
+  /**
+   * @deprecated since 7.0  Use {@link Builder} methods instead. 
+   */
+  @Deprecated
+  public void setConnectionTimeout(int timeout) {
+    this.connectionTimeout = timeout;
+    synchronized (aliveServers) {
+      Iterator<ServerWrapper> wrappersIt = aliveServers.values().iterator();
+      while (wrappersIt.hasNext()) {
+       // wrappersIt.next().client.setConnectionTimeout(timeout);
+      }
+    }
+    Iterator<ServerWrapper> wrappersIt = zombieServers.values().iterator();
+    while (wrappersIt.hasNext()) {
+      //wrappersIt.next().client.setConnectionTimeout(timeout);
+    }
+  }
+
+  /**
+   * set soTimeout (read timeout) on the underlying HttpConnectionManager. This is desirable for queries, but probably
+   * not for indexing.
+   *
+   * @deprecated since 7.0  Use {@link Builder} methods instead. 
+   */
+  @Deprecated
+  public void setSoTimeout(int timeout) {
+    this.soTimeout = timeout;
+    synchronized (aliveServers) {
+      Iterator<ServerWrapper> wrappersIt = aliveServers.values().iterator();
+      while (wrappersIt.hasNext()) {
+        //wrappersIt.next().client.setSoTimeout(timeout);
+      }
+    }
+    Iterator<ServerWrapper> wrappersIt = zombieServers.values().iterator();
+    while (wrappersIt.hasNext()) {
+      //wrappersIt.next().client.setSoTimeout(timeout);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (aliveCheckExecutor != null) {
+      aliveCheckExecutor.shutdownNow();
+    }
+    if(clientIsInternal) {
+      IOUtils.closeQuietly(httpClient);
+    }
+    
+//    for (SolrClient client : clients) {
+//      try {
+//        client.close();
+//      } catch (IOException e) {
+//        throw new RuntimeException(e);
+//      }
+//    }
+//    if (zombieServers != null) {
+//      synchronized (zombieServers) {
+//
+//        for (S zombieServer : zombieServers.values()) {
+//          try {
+//            zombieServer.client.close();
+//          } catch (IOException e) {
+//            throw new RuntimeException(e);
+//          }
+//        }
+//      }
+//    }
+//    if (aliveServers != null) {
+//      synchronized (aliveServers) {
+//
+//        for (ServerWrapper aliveServer : aliveServers.values()) {
+//          try {
+//            aliveServer.client.close();
+//          } catch (IOException e) {
+//            throw new RuntimeException(e);
+//          }
+//        }
+//      }
+//    }
+  }
+
+  /**
+   * Tries to query a live server. A SolrServerException is thrown if all servers are dead.
+   * If the request failed due to IOException then the live server is moved to dead pool and the request is
+   * retried on another live server.  After live servers are exhausted, any servers previously marked as dead
+   * will be tried before failing the request.
+   *
+   * @param request the SolrRequest.
+   *
+   * @return response
+   *
+   * @throws IOException If there is a low-level I/O error.
+   */
+  @Override
+  public NamedList<Object> request(final SolrRequest request, String collection)
+          throws SolrServerException, IOException {
+    return request(request, collection, null);
+  }
+
+  public NamedList<Object> request(final SolrRequest request, String collection,
+      final Integer numServersToTry) throws SolrServerException, IOException {
+    Exception ex = null;
+    ServerWrapper[] serverList = aliveServerList;
+    
+    final int maxTries = (numServersToTry == null ? serverList.length : numServersToTry.intValue());
+    int numServersTried = 0;
+    Map<String,ServerWrapper> justFailed = null;
+
+    boolean timeAllowedExceeded = false;
+    long timeAllowedNano = getTimeAllowedInNanos(request);
+    long timeOutTime = System.nanoTime() + timeAllowedNano;
+    for (int attempts=0; attempts<maxTries; attempts++) {
+      if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+        break;
+      }
+      
+      int count = counter.incrementAndGet() & Integer.MAX_VALUE;
+      ServerWrapper wrapper = serverList[count % serverList.length];
+
+      try {
+        ++numServersTried;
+        return wrapper.request(request, collection);
+      } catch (SolrException e) {
+        // Server is alive but the request was malformed or invalid
+        throw e;
+      } catch (SolrServerException e) {
+        if (e.getRootCause() instanceof IOException) {
+          ex = e;
+          moveAliveToDead(wrapper);
+          if (justFailed == null) justFailed = new HashMap<>();
+          justFailed.put(wrapper.getKey(), wrapper);
+        } else {
+          throw e;
+        }
+      } catch (Exception e) {
+        throw new SolrServerException(e);
+      }
+    }
+
+    // try other standard servers that we didn't try just now
+    for (ServerWrapper wrapper : zombieServers.values()) {
+      if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+        break;
+      }
+      
+      if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getKey())) continue;
+      try {
+        ++numServersTried;
+        NamedList<Object> rsp = wrapper.request(request, collection);
+        // remove from zombie list *before* adding to alive to avoid a race that could lose a server
+        zombieServers.remove(wrapper.getKey());
+        addToAlive(wrapper);
+        return rsp;
+      } catch (SolrException e) {
+        // Server is alive but the request was malformed or invalid
+        throw e;
+      } catch (SolrServerException e) {
+        if (e.getRootCause() instanceof IOException) {
+          ex = e;
+          // still dead
+        } else {
+          throw e;
+        }
+      } catch (Exception e) {
+        throw new SolrServerException(e);
+      }
+    }
+
+
+    final String solrServerExceptionMessage;
+    if (timeAllowedExceeded) {
+      solrServerExceptionMessage = "Time allowed to handle this request exceeded";
+    } else {
+      if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+        solrServerExceptionMessage = "No live SolrServers available to handle this request:"
+            + " numServersTried="+numServersTried
+            + " numServersToTry="+numServersToTry.intValue();
+      } else {
+        solrServerExceptionMessage = "No live SolrServers available to handle this request";
+      }
+    }
+    if (ex == null) {
+      throw new SolrServerException(solrServerExceptionMessage);
+    } else {
+      throw new SolrServerException(solrServerExceptionMessage, ex);
+    }
+  }
+  
+  /**
+   * @return time allowed in nanos, returns -1 if no time_allowed is specified.
+   */
+  private long getTimeAllowedInNanos(final SolrRequest req) {
+    SolrParams reqParams = req.getParams();
+    return reqParams == null ? -1 : 
+      TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS);
+  }
+  
+  private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
+    return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
+  }
+  
+  /**
+   * Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for
+   * aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute
+   *
+   * @param zombieServer a server in the dead pool
+   */
+  private void checkAZombieServer(ServerWrapper zombieServer) {
+    try {
+      SolrQuery solrQuery = new SolrQuery("*:*");
+      solrQuery.setRows(0);
+      /**
+       * Default sort (if we don't supply a sort) is by score and since
+       * we request 0 rows any sorting and scoring is not necessary.
+       * SolrQuery.DOCID schema-independently specifies a non-scoring sort.
+       * <code>_docid_ asc</code> sort is efficient,
+       * <code>_docid_ desc</code> sort is not, so choose ascending DOCID sort.
+       */
+      solrQuery.setSort(SolrQuery.DOCID, SolrQuery.ORDER.asc);
+      // not a top-level request, we are interested only in the server being sent to i.e. it need not distribute our request to further servers    
+      solrQuery.setDistrib(false);
+      QueryRequest request = new QueryRequest(solrQuery);
+      QueryResponse resp = request.process(zombieServer.solrClient);
+      //QueryResponse resp = zombieServer.solrClient.query(solrQuery);
+      if (resp.getStatus() == 0) {
+        // server has come back up.
+        // make sure to remove from zombies before adding to alive to avoid a race condition
+        // where another thread could mark it down, move it back to zombie, and then we delete
+        // from zombie and lose it forever.
+        ServerWrapper wrapper = zombieServers.remove(zombieServer.getKey());
+        if (wrapper != null) {
+          wrapper.failedPings = 0;
+          if (wrapper.standard) {
+            addToAlive(wrapper);
+          }
+        } else {
+          // something else already moved the server from zombie to alive
+        }
+      }
+    } catch (Exception e) {
+      //Expected. The server is still down.
+      zombieServer.failedPings++;
+
+      // If the server doesn't belong in the standard set belonging to this load balancer
+      // then simply drop it after a certain number of failed pings.
+      if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) {
+        zombieServers.remove(zombieServer.getKey());
+      }
+    }
+  }
+
+  private void moveAliveToDead(ServerWrapper wrapper) {
+    wrapper = removeFromAlive(wrapper.getKey());
+    if (wrapper == null)
+      return;  // another thread already detected the failure and removed it
+    zombieServers.put(wrapper.getKey(), wrapper);
+    startAliveCheckExecutor();
+  }
+
+  private int interval = CHECK_INTERVAL;
+
+  /**
+   * LBHttpSolrServer keeps pinging the dead servers at fixed interval to find if it is alive. Use this to set that
+   * interval
+   *
+   * @param interval time in milliseconds
+   */
+  public void setAliveCheckInterval(int interval) {
+    if (interval <= 0) {
+      throw new IllegalArgumentException("Alive check interval must be " +
+              "positive, specified value = " + interval);
+    }
+    this.interval = interval;
+  }
+
+  private void startAliveCheckExecutor() {
+    // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor
+    // if it's not null.
+    if (aliveCheckExecutor == null) {
+      synchronized (this) {
+        if (aliveCheckExecutor == null) {
+          aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor(
+              new SolrjNamedThreadFactory("aliveCheckExecutor"));
+          aliveCheckExecutor.scheduleAtFixedRate(
+                  getAliveCheckRunner(new WeakReference<>(this)),
+                  this.interval, this.interval, TimeUnit.MILLISECONDS);
+        }
+      }
+    }
+  }
+
+  private static Runnable getAliveCheckRunner(final WeakReference<AsyncLBHttpSolrClient> lbRef) {
+    return () -> {
+      AsyncLBHttpSolrClient lb = lbRef.get();
+      if (lb != null && lb.zombieServers != null) {
+        for (ServerWrapper zombieServer : lb.zombieServers.values()) {
+          lb.checkAZombieServer(zombieServer);
+        }
+      }
+    };
+  }
+
+  /**
+   * Return the HttpClient this instance uses.
+   */
+  public SolrInternalHttpClient getHttpClient() {
+    return httpClient;
+  }
+
+  public ResponseParser getParser() {
+    return parser;
+  }
+
+  /**
+   * Changes the {@link ResponseParser} that will be used for the internal
+   * SolrServer objects.
+   *
+   * @param parser Default Response Parser chosen to parse the response if the parser
+   *               were not specified as part of the request.
+   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+   */
+  public void setParser(ResponseParser parser) {
+    this.parser = parser;
+  }
+
+  /**
+   * Changes the {@link RequestWriter} that will be used for the internal
+   * SolrServer objects.
+   *
+   * @param requestWriter Default RequestWriter, used to encode requests sent to the server.
+   */
+  public void setRequestWriter(RequestWriter requestWriter) {
+    this.requestWriter = requestWriter;
+  }
+  
+  public RequestWriter getRequestWriter() {
+    return requestWriter;
+  }
+  
+  @Override
+  protected void finalize() throws Throwable {
+    try {
+      if(this.aliveCheckExecutor!=null)
+        this.aliveCheckExecutor.shutdownNow();
+    } finally {
+      super.finalize();
+    }
+  }
+
+  // defaults
+  private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
+  private static final int NONSTANDARD_PING_LIMIT = 5;  // number of times we'll ping dead servers not in the server list
+
+  /**
+   * Constructs {@link AsyncLBHttpSolrClient} instances from provided configuration.
+   */
+  public static class Builder extends SolrClientBuilder<Builder> {
+    protected final List<String> baseSolrUrls;
+    protected Http2SolrClient.Builder httpSolrClientBuilder;
+    protected Map<String,String> headers = new HashMap<>();
+    private Http2SolrClient solrClient;
+
+    public Builder() {
+      this.baseSolrUrls = new ArrayList<>();
+      this.responseParser = new BinaryResponseParser();
+    }
+
+    public Http2SolrClient.Builder getHttpSolrClientBuilder() {
+      return httpSolrClientBuilder;
+    }
+   
+    /**
+     * Provide a Solr endpoint to be used when configuring {@link AsyncLBHttpSolrClient} instances.
+     * 
+     * Method may be called multiple times.  All provided values will be used.
+     * 
+     * Two different paths can be specified as a part of the URL:
+     * 
+     * 1) A path pointing directly at a particular core
+     * <pre>
+     *   SolrClient client = builder.withBaseSolrUrl("http://my-solr-server:8983/solr/core1").build();
+     *   QueryResponse resp = client.query(new SolrQuery("*:*"));
+     * </pre>
+     * Note that when a core is provided in the base URL, queries and other requests can be made without mentioning the
+     * core explicitly.  However, the client can only send requests to that core.
+     * 
+     * 2) The path of the root Solr path ("/solr")
+     * <pre>
+     *   SolrClient client = builder.withBaseSolrUrl("http://my-solr-server:8983/solr").build();
+     *   QueryResponse resp = client.query("core1", new SolrQuery("*:*"));
+     * </pre>
+     * In this case the client is more flexible and can be used to send requests to any cores.  This flexibility though
+     * requires that the core is specified on all requests.
+     */
+    public Builder withBaseSolrUrl(String baseSolrUrl) {
+      this.baseSolrUrls.add(baseSolrUrl);
+      return this;
+    }
+ 
+    /**
+     * Provide Solr endpoints to be used when configuring {@link AsyncLBHttpSolrClient} instances.
+     * 
+     * Method may be called multiple times.  All provided values will be used.
+     * 
+     * Two different paths can be specified as a part of each URL:
+     * 
+     * 1) A path pointing directly at a particular core
+     * <pre>
+     *   SolrClient client = builder.withBaseSolrUrls("http://my-solr-server:8983/solr/core1").build();
+     *   QueryResponse resp = client.query(new SolrQuery("*:*"));
+     * </pre>
+     * Note that when a core is provided in the base URL, queries and other requests can be made without mentioning the
+     * core explicitly.  However, the client can only send requests to that core.
+     * 
+     * 2) The path of the root Solr path ("/solr")
+     * <pre>
+     *   SolrClient client = builder.withBaseSolrUrls("http://my-solr-server:8983/solr").build();
+     *   QueryResponse resp = client.query("core1", new SolrQuery("*:*"));
+     * </pre>
+     * In this case the client is more flexible and can be used to send requests to any cores.  This flexibility though
+     * requires that the core is specified on all requests.
+     */
+    public Builder withBaseSolrUrls(String... solrUrls) {
+      for (String baseSolrUrl : solrUrls) {
+        this.baseSolrUrls.add(baseSolrUrl);
+      }
+      return this;
+    }
+
+    /**
+     * Provides a {@link HttpSolrClient.Builder} to be used for building the internally used clients.
+     */
+    public Builder withHttpSolrClientBuilder(Http2SolrClient.Builder builder) {
+      this.httpSolrClientBuilder = builder;
+      return this;
+    }
+    
+    public Builder withHttp2SolrClient(Http2SolrClient solrClient) {
+      this.solrClient = solrClient;
+      return this;
+    }
+    
+    public Builder withHeader(String header, String value) {
+      this.headers.put(header, value);
+      return this;
+    }
+    
+    public Builder solrInternal() {
+      this.headers.put(QoSParams.REQUEST_SOURCE, QoSParams.INTERNAL);
+      return this;
+    }
+
+    /**
+     * Create a {@link HttpSolrClient} based on provided configuration.
+     */
+    public AsyncLBHttpSolrClient build() {
+      return new AsyncLBHttpSolrClient(this, solrClient);
+    }
+
+    @Override
+    public Builder getThis() {
+      return this;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/74a9b54c/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
index 542c876..683fdc2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
@@ -16,6 +16,8 @@
  */
 package org.apache.solr.client.solrj.impl;
 
+import static org.apache.solr.common.params.CommonParams.JAVABIN_MIME;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -27,8 +29,6 @@ import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.util.ContentStream;
 
-import static org.apache.solr.common.params.CommonParams.JAVABIN_MIME;
-
 /**
  * A RequestWriter which writes requests in the javabin format
  *

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/74a9b54c/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryResponseParser.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryResponseParser.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryResponseParser.java
index 18baf03..049073d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryResponseParser.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryResponseParser.java
@@ -16,15 +16,15 @@
  */
 package org.apache.solr.client.solrj.impl;
 
-import org.apache.solr.client.solrj.ResponseParser;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.JavaBinCodec;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
 
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.NamedList;
+
 /**
  *
  * @since solr 1.3

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/74a9b54c/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 34d789f..bc136b5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -16,6 +16,9 @@
  */
 package org.apache.solr.client.solrj.impl;
 
+import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
+import static org.apache.solr.common.params.CommonParams.ID;
+
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.net.ConnectException;
@@ -44,7 +47,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.http.NoHttpResponseException;
-import org.apache.http.client.HttpClient;
 import org.apache.http.conn.ConnectTimeoutException;
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrClient;
@@ -57,6 +59,7 @@ import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.client.solrj.util.SolrInternalHttpClient;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.SolrInputDocument;
@@ -78,18 +81,17 @@ import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.Hash;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.Utils;
+import org.eclipse.jetty.client.HttpClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
-import static org.apache.solr.common.params.CommonParams.ID;
-
 /**
  * SolrJ client class to communicate with SolrCloud.
  * Instances of this class communicate with Zookeeper to discover
@@ -109,7 +111,7 @@ public class CloudSolrClient extends SolrClient {
   private volatile String defaultCollection;
   private final LBHttpSolrClient lbClient;
   private final boolean shutdownLBHttpSolrServer;
-  private HttpClient myClient;
+  private SolrInternalHttpClient httpClient;
   private final boolean clientIsInternal;
   //no of times collection state to be reloaded if stale state error is received
   private static final int MAX_STALE_RETRIES = 5;
@@ -270,15 +272,15 @@ public class CloudSolrClient extends SolrClient {
     } else {
       this.stateProvider = builder.stateProvider;
     }
-    this.clientIsInternal = builder.httpClient == null;
     this.shutdownLBHttpSolrServer = builder.loadBalancedSolrClient == null;
     if(builder.lbClientBuilder != null) {
       propagateLBClientConfigOptions(builder);
       builder.loadBalancedSolrClient = builder.lbClientBuilder.build();
     }
     if(builder.loadBalancedSolrClient != null) builder.httpClient = builder.loadBalancedSolrClient.getHttpClient();
-    this.myClient = (builder.httpClient == null) ? HttpClientUtil.createClient(null) : builder.httpClient;
-    if (builder.loadBalancedSolrClient == null) builder.loadBalancedSolrClient = createLBHttpSolrClient(builder, myClient);
+    this.clientIsInternal = builder.httpClient == null;
+    this.httpClient = (builder.httpClient == null) ? new SolrInternalHttpClient(getClass().getSimpleName()) : builder.httpClient;
+    if (builder.loadBalancedSolrClient == null) builder.loadBalancedSolrClient = createLBHttpSolrClient(builder, httpClient);
     this.lbClient = builder.loadBalancedSolrClient;
     this.updatesToLeaders = builder.shardLeadersOnly;
     this.parallelUpdates = builder.parallelUpdates;
@@ -295,6 +297,10 @@ public class CloudSolrClient extends SolrClient {
     if (builder.socketTimeoutMillis != null) {
       lbBuilder.withSocketTimeout(builder.socketTimeoutMillis);
     }
+    
+    if (builder.httpClient != null) {
+      lbBuilder.withHttpClient(builder.httpClient);
+    }
   }
 
   /** Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
@@ -454,7 +460,7 @@ public class CloudSolrClient extends SolrClient {
    * Register a CollectionStateWatcher to be called when the cluster state for a collection changes
    *
    * Note that the watcher is unregistered after it has been called once.  To make a watcher persistent,
-   * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
+   * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(boolean, Set, DocCollection)}
    * call
    *
    * @param collection the collection to watch
@@ -1133,8 +1139,8 @@ public class CloudSolrClient extends SolrClient {
       lbClient.close();
     }
     
-    if (clientIsInternal && myClient!=null) {
-      HttpClientUtil.close(myClient);
+    if (clientIsInternal && httpClient!=null) {
+      IOUtils.closeQuietly(httpClient);
     }
 
     if(this.threadPool != null && !this.threadPool.isShutdown()) {
@@ -1146,8 +1152,8 @@ public class CloudSolrClient extends SolrClient {
     return lbClient;
   }
 
-  public HttpClient getHttpClient() {
-    return myClient;
+  public SolrInternalHttpClient getHttpClient() {
+    return httpClient;
   }
   
   public boolean isUpdatesToLeaders() {
@@ -1329,15 +1335,17 @@ public class CloudSolrClient extends SolrClient {
     return true;
   }
 
-  private static LBHttpSolrClient createLBHttpSolrClient(Builder cloudSolrClientBuilder, HttpClient httpClient) {
+  private static LBHttpSolrClient createLBHttpSolrClient(Builder cloudSolrClientBuilder, SolrInternalHttpClient httpClient) {
     final LBHttpSolrClient.Builder lbBuilder = new LBHttpSolrClient.Builder();
-    lbBuilder.withHttpClient(httpClient);
     if (cloudSolrClientBuilder.connectionTimeoutMillis != null) {
       lbBuilder.withConnectionTimeout(cloudSolrClientBuilder.connectionTimeoutMillis);
     }
     if (cloudSolrClientBuilder.socketTimeoutMillis != null) {
       lbBuilder.withSocketTimeout(cloudSolrClientBuilder.socketTimeoutMillis);
     }
+    if (httpClient != null) {
+      lbBuilder.withHttpClient(httpClient);
+    }
     final LBHttpSolrClient lbClient = lbBuilder.build();
     lbClient.setRequestWriter(new BinaryRequestWriter());
     lbClient.setParser(new BinaryResponseParser());
@@ -1358,6 +1366,7 @@ public class CloudSolrClient extends SolrClient {
     protected boolean directUpdatesToLeadersOnly = false;
     protected boolean parallelUpdates = true;
     protected ClusterStateProvider stateProvider;
+    protected SolrInternalHttpClient httpClient;
     
     /**
      * @deprecated use other constructors instead.  This constructor will be changing visibility in an upcoming release.
@@ -1522,6 +1531,11 @@ public class CloudSolrClient extends SolrClient {
       this.parallelUpdates = parallelUpdates;
       return this;
     }
+    
+    public Builder withHttpClient(SolrInternalHttpClient httpClient) {
+      this.httpClient = httpClient;
+      return this;
+    }
 
     /**
      * Expert feature where you want to implement a custom cluster discovery mechanism of the solr nodes as part of the
@@ -1563,4 +1577,4 @@ public class CloudSolrClient extends SolrClient {
       return this;
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/74a9b54c/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
index 5845e7f..0a154bc 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
@@ -106,17 +106,18 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
                                        ExecutorService es, boolean streamDeletes) {
     this((streamDeletes) ?
         new Builder(solrServerUrl)
-        .withHttpClient(client)
+        //.withHttpClient(client)
         .withQueueSize(queueSize)
         .withThreadCount(threadCount)
         .withExecutorService(es)
         .alwaysStreamDeletes() :
           new Builder(solrServerUrl)
-          .withHttpClient(client)
+          //.withHttpClient(client)
           .withQueueSize(queueSize)
           .withThreadCount(threadCount)
           .withExecutorService(es)
           .neverStreamDeletes());
+    // nocommit
   }
   
   protected ConcurrentUpdateSolrClient(Builder builder) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/74a9b54c/solr/solrj/src/java/org/apache/solr/client/solrj/impl/DelegationTokenHttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/DelegationTokenHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/DelegationTokenHttpSolrClient.java
index e878110..2e1b679 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/DelegationTokenHttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/DelegationTokenHttpSolrClient.java
@@ -36,7 +36,7 @@ public class DelegationTokenHttpSolrClient extends HttpSolrClient {
 
   /**
    * Package protected constructor for use by 
-   * {@linkplain org.apache.solr.client.solrj.impl.HttpSolrClient.Builder}.
+   * {@linkplain org.apache.solr.client.solrj.impl.Http2SolrClient.Builder}.
    * @lucene.internal
    * 
    * @deprecated use {@link DelegationTokenHttpSolrClient#DelegationTokenHttpSolrClient(HttpSolrClient.Builder)} instead, as it is a more
@@ -64,7 +64,7 @@ public class DelegationTokenHttpSolrClient extends HttpSolrClient {
 
   /**
    * This constructor is defined at "protected" scope. Ideally applications should
-   * use {@linkplain org.apache.solr.client.solrj.impl.HttpSolrClient.Builder} instance
+   * use {@linkplain org.apache.solr.client.solrj.impl.Http2SolrClient.Builder} instance
    * to configure this Solr client instance.
    *
    * @param baseURL The base url to communicate with the Solr server


Mime
View raw message