ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [14/17] incubator-ranger git commit: Support for Solr as Audit Destination.
Date Tue, 17 Mar 2015 23:33:36 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
new file mode 100644
index 0000000..8272813
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -0,0 +1,1232 @@
+package org.apache.solr.client.solrj.impl;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.Hash;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * SolrJ client class to communicate with SolrCloud.
+ * Instances of this class communicate with Zookeeper to discover
+ * Solr endpoints for SolrCloud collections, and then use the 
+ * {@link LBHttpSolrClient} to issue requests.
+ * 
+ * This class assumes the id field for your documents is called
+ * 'id' - if this is not the case, you must set the right name
+ * with {@link #setIdField(String)}.
+ */
+@SuppressWarnings("serial")
+public class CloudSolrClient extends SolrClient {
+  protected static final Logger log = LoggerFactory.getLogger(CloudSolrClient.class);
+
+  private volatile ZkStateReader zkStateReader;
+  private String zkHost; // the zk server connect string
+  private int zkConnectTimeout = 10000;
+  private int zkClientTimeout = 10000;
+  private volatile String defaultCollection;
+  private final LBHttpSolrClient lbClient;
+  private final boolean shutdownLBHttpSolrServer;
+  private HttpClient myClient;
+  private final boolean clientIsInternal;
+  //no of times collection state to be reloaded if stale state error is received
+  private static final int MAX_STALE_RETRIES = 5;
+  Random rand = new Random();
+  
+  private final boolean updatesToLeaders;
+  private boolean parallelUpdates = true;
+  private ExecutorService threadPool = Executors
+      .newCachedThreadPool(new SolrjNamedThreadFactory(
+          "CloudSolrServer ThreadPool"));
+  private String idField = "id";
+  public static final String STATE_VERSION = "_stateVer_";
+  private final Set<String> NON_ROUTABLE_PARAMS;
+  {
+    NON_ROUTABLE_PARAMS = new HashSet<>();
+    NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
+    
+    NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
+    
+    // Not supported via SolrCloud
+    // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
+
+  }
+  private volatile long timeToLive = 60* 1000L;
+  private volatile List<Object> locks = objectList(3);
+
+
+  protected final Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
+    @Override
+    public ExpiringCachedDocCollection get(Object key) {
+      ExpiringCachedDocCollection val = super.get(key);
+      if(val == null) return null;
+      if(val.isExpired(timeToLive)) {
+        super.remove(key);
+        return null;
+      }
+      return val;
+    }
+
+  };
+
+  class ExpiringCachedDocCollection {
+    final DocCollection cached;
+    long cachedAt;
+
+    ExpiringCachedDocCollection(DocCollection cached) {
+      this.cached = cached;
+      this.cachedAt = System.currentTimeMillis();
+    }
+
+    boolean isExpired(long timeToLive) {
+      return (System.currentTimeMillis() - cachedAt) > timeToLive;
+    }
+  }
+
+  /**
+   * Create a new client object that connects to Zookeeper and is always aware
+   * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
+   * SolrCloud has enough replicas for every shard in a collection, there is no
+   * single point of failure. Updates will be sent to shard leaders by default.
+   * 
+   * @param zkHost
+   *          The client endpoint of the zookeeper quorum containing the cloud
+   *          state. The full specification for this string is one or more comma
+   *          separated HOST:PORT values, followed by an optional chroot value
+   *          that starts with a forward slash. Using a chroot allows multiple
+   *          applications to coexist in one ensemble. For full details, see the
+   *          Zookeeper documentation. Some examples:
+   *          <p>
+   *          "host1:2181"
+   *          <p>
+   *          "host1:2181,host2:2181,host3:2181/mysolrchroot"
+   *          <p>
+   *          "zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181"
+   */
+  public CloudSolrClient(String zkHost) {
+      this.zkHost = zkHost;
+      this.clientIsInternal = true;
+      this.myClient = HttpClientUtil.createClient(null);
+      this.lbClient = new LBHttpSolrClient(myClient);
+      this.lbClient.setRequestWriter(new BinaryRequestWriter());
+      this.lbClient.setParser(new BinaryResponseParser());
+      this.updatesToLeaders = true;
+      shutdownLBHttpSolrServer = true;
+      lbClient.addQueryParams(STATE_VERSION);
+  }
+
+  /**
+   * Create a new client object that connects to Zookeeper and is always aware
+   * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
+   * SolrCloud has enough replicas for every shard in a collection, there is no
+   * single point of failure. Updates will be sent to shard leaders by default.
+   *
+   * @param zkHost
+   *          The client endpoint of the zookeeper quorum containing the cloud
+   *          state. The full specification for this string is one or more comma
+   *          separated HOST:PORT values, followed by an optional chroot value
+   *          that starts with a forward slash. Using a chroot allows multiple
+   *          applications to coexist in one ensemble. For full details, see the
+   *          Zookeeper documentation. Some examples:
+   *          <p>
+   *          "host1:2181"
+   *          <p>
+   *          "host1:2181,host2:2181,host3:2181/mysolrchroot"
+   *          <p>
+   *          "zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181"
+   * @param httpClient
+   *          the {@link HttpClient} instance to be used for all requests. The
+   *          provided httpClient should use a multi-threaded connection manager.
+   */
+  public CloudSolrClient(String zkHost, HttpClient httpClient)  {
+    this.zkHost = zkHost;
+    this.clientIsInternal = httpClient == null;
+    this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
+    this.lbClient = new LBHttpSolrClient(myClient);
+    this.lbClient.setRequestWriter(new BinaryRequestWriter());
+    this.lbClient.setParser(new BinaryResponseParser());
+    this.updatesToLeaders = true;
+    shutdownLBHttpSolrServer = true;
+    lbClient.addQueryParams(STATE_VERSION);
+  }
+  
+  /**
+   * Create a new client object using multiple string values in a Collection
+   * instead of a standard zkHost connection string. Note that this method will
+   * not be used if there is only one String argument - that will use
+   * {@link #CloudSolrClient(String)} instead.
+   * 
+   * @param zkHosts
+   *          A Java Collection (List, Set, etc) of HOST:PORT strings, one for
+   *          each host in the zookeeper ensemble. Note that with certain
+   *          Collection types like HashSet, the order of hosts in the final
+   *          connect string may not be in the same order you added them.
+   * @param chroot
+   *          A chroot value for zookeeper, starting with a forward slash. If no
+   *          chroot is required, use null.
+   * @throws IllegalArgumentException
+   *           if the chroot value does not start with a forward slash.
+   * @see #CloudSolrClient(String)
+   */
+  public CloudSolrClient(Collection<String> zkHosts, String chroot) {
+    this(zkHosts, chroot, null);
+  }
+
+  /**
+   * Create a new client object using multiple string values in a Collection
+   * instead of a standard zkHost connection string. Note that this method will
+   * not be used if there is only one String argument - that will use
+   * {@link #CloudSolrClient(String)} instead.
+   *
+   * @param zkHosts
+   *          A Java Collection (List, Set, etc) of HOST:PORT strings, one for
+   *          each host in the zookeeper ensemble. Note that with certain
+   *          Collection types like HashSet, the order of hosts in the final
+   *          connect string may not be in the same order you added them.
+   * @param chroot
+   *          A chroot value for zookeeper, starting with a forward slash. If no
+   *          chroot is required, use null.
+   * @param httpClient
+   *          the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a
+   *          multi-threaded connection manager.
+   * @throws IllegalArgumentException
+   *           if the chroot value does not start with a forward slash.
+   * @see #CloudSolrClient(String)
+   */
+  public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient) {
+    StringBuilder zkBuilder = new StringBuilder();
+    int lastIndexValue = zkHosts.size() - 1;
+    int i = 0;
+    for (String zkHost : zkHosts) {
+      zkBuilder.append(zkHost);
+      if (i < lastIndexValue) {
+        zkBuilder.append(",");
+      }
+      i++;
+    }
+    if (chroot != null) {
+      if (chroot.startsWith("/")) {
+        zkBuilder.append(chroot);
+      } else {
+        throw new IllegalArgumentException(
+            "The chroot must start with a forward slash.");
+      }
+    }
+
+    /* Log the constructed connection string and then initialize. */
+    log.info("Final constructed zkHost string: " + zkBuilder.toString());
+
+    this.zkHost = zkBuilder.toString();
+    this.clientIsInternal = httpClient == null;
+    this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
+    this.lbClient = new LBHttpSolrClient(myClient);
+    this.lbClient.setRequestWriter(new BinaryRequestWriter());
+    this.lbClient.setParser(new BinaryResponseParser());
+    this.updatesToLeaders = true;
+    shutdownLBHttpSolrServer = true;
+  }
+  
+  /**
+   * @param zkHost
+   *          A zookeeper client endpoint.
+   * @param updatesToLeaders
+   *          If true, sends updates only to shard leaders.
+   * @see #CloudSolrClient(String) for full description and details on zkHost
+   */
+  public CloudSolrClient(String zkHost, boolean updatesToLeaders) {
+    this(zkHost, updatesToLeaders, null);
+  }
+
+  /**
+   * @param zkHost
+   *          A zookeeper client endpoint.
+   * @param updatesToLeaders
+   *          If true, sends updates only to shard leaders.
+   * @param httpClient
+   *          the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a
+   *          multi-threaded connection manager.
+   * @see #CloudSolrClient(String) for full description and details on zkHost
+   */
+  public CloudSolrClient(String zkHost, boolean updatesToLeaders, HttpClient httpClient) {
+    this.zkHost = zkHost;
+    this.clientIsInternal = httpClient == null;
+    this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
+    this.lbClient = new LBHttpSolrClient(myClient);
+    this.lbClient.setRequestWriter(new BinaryRequestWriter());
+    this.lbClient.setParser(new BinaryResponseParser());
+    this.updatesToLeaders = updatesToLeaders;
+    shutdownLBHttpSolrServer = true;
+    lbClient.addQueryParams(STATE_VERSION);
+  }
+
+  /**Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
+   * @param seconds ttl value in seconds
+   */
+  public void setCollectionCacheTTl(int seconds){
+    assert seconds > 0;
+    timeToLive = seconds*1000L;
+  }
+
+  /**
+   * @param zkHost
+   *          A zookeeper client endpoint.
+   * @param lbClient
+   *          LBHttpSolrServer instance for requests.
+   * @see #CloudSolrClient(String) for full description and details on zkHost
+   */
+  public CloudSolrClient(String zkHost, LBHttpSolrClient lbClient) {
+    this(zkHost, lbClient, true);
+  }
+  
+  /**
+   * @param zkHost
+   *          A zookeeper client endpoint.
+   * @param lbClient
+   *          LBHttpSolrServer instance for requests.
+   * @param updatesToLeaders
+   *          If true, sends updates only to shard leaders.
+   * @see #CloudSolrClient(String) for full description and details on zkHost
+   */
+  public CloudSolrClient(String zkHost, LBHttpSolrClient lbClient, boolean updatesToLeaders) {
+    this.zkHost = zkHost;
+    this.lbClient = lbClient;
+    this.updatesToLeaders = updatesToLeaders;
+    shutdownLBHttpSolrServer = false;
+    this.clientIsInternal = false;
+    lbClient.addQueryParams(STATE_VERSION);
+  }
+  
+  public ResponseParser getParser() {
+    return lbClient.getParser();
+  }
+  
+  /**
+   * Note: This setter method is <b>not thread-safe</b>.
+   * 
+   * @param processor
+   *          Default Response Parser chosen to parse the response if the parser
+   *          were not specified as part of the request.
+   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+   */
+  public void setParser(ResponseParser processor) {
+    lbClient.setParser(processor);
+  }
+  
+  public RequestWriter getRequestWriter() {
+    return lbClient.getRequestWriter();
+  }
+  
+  public void setRequestWriter(RequestWriter requestWriter) {
+    lbClient.setRequestWriter(requestWriter);
+  }
+
+  /**
+   * @return the zkHost value used to connect to zookeeper.
+   */
+  public String getZkHost() {
+    return zkHost;
+  }
+
+  public ZkStateReader getZkStateReader() {
+    return zkStateReader;
+  }
+
+  /**
+   * @param idField the field to route documents on.
+   */
+  public void setIdField(String idField) {
+    this.idField = idField;
+  }
+
+  /**
+   * @return the field that updates are routed on.
+   */
+  public String getIdField() {
+    return idField;
+  }
+  
+  /** Sets the default collection for request */
+  public void setDefaultCollection(String collection) {
+    this.defaultCollection = collection;
+  }
+
+  /** Gets the default collection for request */
+  public String getDefaultCollection() {
+    return defaultCollection;
+  }
+
+  /** Set the connect timeout to the zookeeper ensemble in ms */
+  public void setZkConnectTimeout(int zkConnectTimeout) {
+    this.zkConnectTimeout = zkConnectTimeout;
+  }
+
+  /** Set the timeout to the zookeeper ensemble in ms */
+  public void setZkClientTimeout(int zkClientTimeout) {
+    this.zkClientTimeout = zkClientTimeout;
+  }
+
+  /**
+   * Connect to the zookeeper ensemble.
+   * This is an optional method that may be used to force a connect before any other requests are sent.
+   *
+   */
+  public void connect() {
+    if (zkStateReader == null) {
+      synchronized (this) {
+        if (zkStateReader == null) {
+          ZkStateReader zk = null;
+          try {
+            zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
+            zk.createClusterStateWatchersAndUpdate();
+            zkStateReader = zk;
+          } catch (InterruptedException e) {
+            zk.close();
+            Thread.currentThread().interrupt();
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+          } catch (KeeperException e) {
+            zk.close();
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+          } catch (Exception e) {
+            if (zk != null) zk.close();
+            // do not wrap because clients may be relying on the underlying exception being thrown
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+  public void setParallelUpdates(boolean parallelUpdates) {
+    this.parallelUpdates = parallelUpdates;
+  }
+
+  /**
+   * Upload a set of config files to Zookeeper and give it a name
+   *
+   * NOTE: You should only allow trusted users to upload configs.  If you
+   * are allowing client access to zookeeper, you should protect the
+   * /configs node against unauthorised write access.
+   *
+   * @param configPath {@link java.nio.file.Path} to the config files
+   * @param configName the name of the config
+   * @throws IOException if an IO error occurs
+   */
+  public void uploadConfig(Path configPath, String configName) throws IOException {
+    zkStateReader.getConfigManager().uploadConfigDir(configPath, configName);
+  }
+
+  /**
+   * Download a named config from Zookeeper to a location on the filesystem
+   * @param configName    the name of the config
+   * @param downloadPath  the path to write config files to
+   * @throws IOException  if an I/O exception occurs
+   */
+  public void downloadConfig(String configName, Path downloadPath) throws IOException {
+    zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath);
+  }
+
+  private NamedList<Object> directUpdate(AbstractUpdateRequest request, ClusterState clusterState) throws SolrServerException {
+    UpdateRequest updateRequest = (UpdateRequest) request;
+    ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+    ModifiableSolrParams routableParams = new ModifiableSolrParams();
+    ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+
+    if(params != null) {
+      nonRoutableParams.add(params);
+      routableParams.add(params);
+      for(String param : NON_ROUTABLE_PARAMS) {
+        routableParams.remove(param);
+      }
+    }
+
+    String collection = nonRoutableParams.get(UpdateParams.COLLECTION, defaultCollection);
+    if (collection == null) {
+      throw new SolrServerException("No collection param specified on request and no default collection has been set.");
+    }
+
+
+    //Check to see if the collection is an alias.
+    Aliases aliases = zkStateReader.getAliases();
+    if(aliases != null) {
+      Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
+      if(collectionAliases != null && collectionAliases.containsKey(collection)) {
+        collection = collectionAliases.get(collection);
+      }
+    }
+
+    DocCollection col = getDocCollection(clusterState, collection,null);
+
+    DocRouter router = col.getRouter();
+    
+    if (router instanceof ImplicitDocRouter) {
+      // short circuit as optimization
+      return null;
+    }
+
+    //Create the URL map, which is keyed on slice name.
+    //The value is a list of URLs for each replica in the slice.
+    //The first value in the list is the leader for the slice.
+    Map<String,List<String>> urlMap = buildUrlMap(col);
+    if (urlMap == null) {
+      // we could not find a leader yet - use unoptimized general path
+      return null;
+    }
+
+    NamedList<Throwable> exceptions = new NamedList<>();
+    NamedList<NamedList> shardResponses = new NamedList<>();
+
+    Map<String, LBHttpSolrClient.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
+    if (routes == null) {
+      return null;
+    }
+
+    long start = System.nanoTime();
+
+    if (parallelUpdates) {
+      final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
+      for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+        final String url = entry.getKey();
+        final LBHttpSolrClient.Req lbRequest = entry.getValue();
+        responseFutures.put(url, threadPool.submit(new Callable<NamedList<?>>() {
+          @Override
+          public NamedList<?> call() throws Exception {
+            return lbClient.request(lbRequest).getResponse();
+          }
+        }));
+      }
+
+      for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
+        final String url = entry.getKey();
+        final Future<NamedList<?>> responseFuture = entry.getValue();
+        try {
+          shardResponses.add(url, responseFuture.get());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+          exceptions.add(url, e.getCause());
+        }
+      }
+
+      if (exceptions.size() > 0) {
+        throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
+      }
+    } else {
+      for (Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+        String url = entry.getKey();
+        LBHttpSolrClient.Req lbRequest = entry.getValue();
+        try {
+          NamedList<Object> rsp = lbClient.request(lbRequest).getResponse();
+          shardResponses.add(url, rsp);
+        } catch (Exception e) {
+          throw new SolrServerException(e);
+        }
+      }
+    }
+
+    UpdateRequest nonRoutableRequest = null;
+    List<String> deleteQuery = updateRequest.getDeleteQuery();
+    if (deleteQuery != null && deleteQuery.size() > 0) {
+      UpdateRequest deleteQueryRequest = new UpdateRequest();
+      deleteQueryRequest.setDeleteQuery(deleteQuery);
+      nonRoutableRequest = deleteQueryRequest;
+    }
+    
+    Set<String> paramNames = nonRoutableParams.getParameterNames();
+    
+    Set<String> intersection = new HashSet<>(paramNames);
+    intersection.retainAll(NON_ROUTABLE_PARAMS);
+    
+    if (nonRoutableRequest != null || intersection.size() > 0) {
+      if (nonRoutableRequest == null) {
+        nonRoutableRequest = new UpdateRequest();
+      }
+      nonRoutableRequest.setParams(nonRoutableParams);
+      List<String> urlList = new ArrayList<>();
+      urlList.addAll(routes.keySet());
+      Collections.shuffle(urlList, rand);
+      LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(nonRoutableRequest, urlList);
+      try {
+        LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+        shardResponses.add(urlList.get(0), rsp.getResponse());
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
+      }
+    }
+
+    long end = System.nanoTime();
+
+    RouteResponse rr =  condenseResponse(shardResponses, (long)((end - start)/1000000));
+    rr.setRouteResponses(shardResponses);
+    rr.setRoutes(routes);
+    return rr;
+  }
+
+  private Map<String,List<String>> buildUrlMap(DocCollection col) {
+    Map<String, List<String>> urlMap = new HashMap<>();
+    Collection<Slice> slices = col.getActiveSlices();
+    Iterator<Slice> sliceIterator = slices.iterator();
+    while (sliceIterator.hasNext()) {
+      Slice slice = sliceIterator.next();
+      String name = slice.getName();
+      List<String> urls = new ArrayList<>();
+      Replica leader = slice.getLeader();
+      if (leader == null) {
+        // take unoptimized general path - we cannot find a leader yet
+        return null;
+      }
+      ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+      String url = zkProps.getCoreUrl();
+      urls.add(url);
+      Collection<Replica> replicas = slice.getReplicas();
+      Iterator<Replica> replicaIterator = replicas.iterator();
+      while (replicaIterator.hasNext()) {
+        Replica replica = replicaIterator.next();
+        if (!replica.getNodeName().equals(leader.getNodeName()) &&
+            !replica.getName().equals(leader.getName())) {
+          ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+          String url1 = zkProps1.getCoreUrl();
+          urls.add(url1);
+        }
+      }
+      urlMap.put(name, urls);
+    }
+    return urlMap;
+  }
+
+  public RouteResponse condenseResponse(NamedList response, long timeMillis) {
+    RouteResponse condensed = new RouteResponse();
+    int status = 0;
+    Integer rf = null;
+    Integer minRf = null;
+    for(int i=0; i<response.size(); i++) {
+      NamedList shardResponse = (NamedList)response.getVal(i);
+      NamedList header = (NamedList)shardResponse.get("responseHeader");      
+      Integer shardStatus = (Integer)header.get("status");
+      int s = shardStatus.intValue();
+      if(s > 0) {
+          status = s;
+      }
+      Object rfObj = header.get(UpdateRequest.REPFACT);
+      if (rfObj != null && rfObj instanceof Integer) {
+        Integer routeRf = (Integer)rfObj;
+        if (rf == null || routeRf < rf)
+          rf = routeRf;
+      }
+      minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
+    }
+
+    NamedList cheader = new NamedList();
+    cheader.add("status", status);
+    cheader.add("QTime", timeMillis);
+    if (rf != null)
+      cheader.add(UpdateRequest.REPFACT, rf);
+    if (minRf != null)
+      cheader.add(UpdateRequest.MIN_REPFACT, minRf);
+    
+    condensed.add("responseHeader", cheader);
+    return condensed;
+  }
+
+  public static class RouteResponse extends NamedList {
+    private NamedList routeResponses;
+    private Map<String, LBHttpSolrClient.Req> routes;
+
+    public void setRouteResponses(NamedList routeResponses) {
+      this.routeResponses = routeResponses;
+    }
+
+    public NamedList getRouteResponses() {
+      return routeResponses;
+    }
+
+    public void setRoutes(Map<String, LBHttpSolrClient.Req> routes) {
+      this.routes = routes;
+    }
+
+    public Map<String, LBHttpSolrClient.Req> getRoutes() {
+      return routes;
+    }
+
+  }
+
+  public static class RouteException extends SolrException {
+
+    private NamedList<Throwable> throwables;
+    private Map<String, LBHttpSolrClient.Req> routes;
+
+    public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, LBHttpSolrClient.Req> routes){
+      super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
+      this.throwables = throwables;
+      this.routes = routes;
+    }
+
+    public NamedList<Throwable> getThrowables() {
+      return throwables;
+    }
+
+    public Map<String, LBHttpSolrClient.Req> getRoutes() {
+      return this.routes;
+    }
+  }
+
+  @Override
+  public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
+    SolrParams reqParams = request.getParams();
+    String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
+    return requestWithRetryOnStaleState(request, 0, collection);
+  }
+
+  /**
+   * As this class doesn't watch external collections on the client side,
+   * there's a chance that the request will fail due to cached stale state,
+   * which means the state must be refreshed from ZK and retried.
+   */
+  protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
+      throws SolrServerException, IOException {
+
+    connect(); // important to call this before you start working with the ZkStateReader
+
+    // build up a _stateVer_ param to pass to the server containing all of the
+    // external collection state versions involved in this request, which allows
+    // the server to notify us that our cached state for one or more of the external
+    // collections is stale and needs to be refreshed ... this code has no impact on internal collections
+    String stateVerParam = null;
+    List<DocCollection> requestedCollections = null;
+    if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests
+      Set<String> requestedCollectionNames = getCollectionNames(getZkStateReader().getClusterState(), collection);
+
+      StringBuilder stateVerParamBuilder = null;
+      for (String requestedCollection : requestedCollectionNames) {
+        // track the version of state we're using on the client side using the _stateVer_ param
+        DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null);
+        int collVer = coll.getZNodeVersion();
+        if (coll.getStateFormat()>1) {
+          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+          requestedCollections.add(coll);
+
+          if (stateVerParamBuilder == null) {
+            stateVerParamBuilder = new StringBuilder();
+          } else {
+            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
+          }
+
+          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+        }
+      }
+
+      if (stateVerParamBuilder != null) {
+        stateVerParam = stateVerParamBuilder.toString();
+      }
+    }
+
+    if (request.getParams() instanceof ModifiableSolrParams) {
+      ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+      if (stateVerParam != null) {
+        params.set(STATE_VERSION, stateVerParam);
+      } else {
+        params.remove(STATE_VERSION);
+      }
+    } // else: ??? how to set this ???
+
+    NamedList<Object> resp = null;
+    try {
+      resp = sendRequest(request);
+      //to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from there
+      Object o = resp.get(STATE_VERSION, resp.size()-1);
+      if(o != null && o instanceof Map) {
+        //remove this because no one else needs this and tests would fail if they are comparing responses
+        resp.remove(resp.size()-1);
+        Map invalidStates = (Map) o;
+        for (Object invalidEntries : invalidStates.entrySet()) {
+          Map.Entry e = (Map.Entry) invalidEntries;
+          getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue());
+        }
+
+      }
+    } catch (Exception exc) {
+
+      Throwable rootCause = SolrException.getRootCause(exc);
+      // don't do retry support for admin requests or if the request doesn't have a collection specified
+      if (collection == null || request.getPath().startsWith("/admin")) {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException)exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException)exc;
+        }else if (exc instanceof RuntimeException) {
+          throw (RuntimeException) exc;
+        }
+        else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+
+      int errorCode = (rootCause instanceof SolrException) ?
+          ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
+
+      log.error("Request to collection {} failed due to ("+errorCode+
+          ") {}, retry? "+retryCount, collection, rootCause.toString());
+
+      boolean wasCommError =
+          (rootCause instanceof ConnectException ||
+              rootCause instanceof ConnectTimeoutException ||
+              rootCause instanceof NoHttpResponseException ||
+              rootCause instanceof SocketException);
+
+      boolean stateWasStale = false;
+      if (retryCount < MAX_STALE_RETRIES  &&
+          requestedCollections != null    &&
+          !requestedCollections.isEmpty() &&
+          SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
+      {
+        // cached state for one or more external collections was stale
+        // re-issue request using updated state
+        stateWasStale = true;
+
+        // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
+        for (DocCollection ext : requestedCollections) {
+          collectionStateCache.remove(ext.getName());
+        }
+      }
+
+      // if we experienced a communication error, it's worth checking the state
+      // with ZK just to make sure the node we're trying to hit is still part of the collection
+      if (retryCount < MAX_STALE_RETRIES &&
+          !stateWasStale &&
+          requestedCollections != null &&
+          !requestedCollections.isEmpty() &&
+          wasCommError) {
+        for (DocCollection ext : requestedCollections) {
+          DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null);
+          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+            // looks like we couldn't reach the server because the state was stale == retry
+            stateWasStale = true;
+            // we just pulled state from ZK, so update the cache so that the retry uses it
+            collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
+          }
+        }
+      }
+
+      if (requestedCollections != null) {
+        requestedCollections.clear(); // done with this
+      }
+
+      // if the state was stale, then we retry the request once with new state pulled from Zk
+      if (stateWasStale) {
+        log.warn("Re-trying request to  collection(s) "+collection+" after stale state error from server.");
+        resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
+      } else {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException)exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException)exc;
+        } else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+    }
+
+    return resp;
+  }
+
+  protected NamedList<Object> sendRequest(SolrRequest request)
+      throws SolrServerException, IOException {
+    connect();
+    
+    ClusterState clusterState = zkStateReader.getClusterState();
+    
+    boolean sendToLeaders = false;
+    List<String> replicas = null;
+    
+    if (request instanceof IsUpdateRequest) {
+      if (request instanceof UpdateRequest) {
+        NamedList<Object> response = directUpdate((AbstractUpdateRequest) request,
+            clusterState);
+        if (response != null) {
+          return response;
+        }
+      }
+      sendToLeaders = true;
+      replicas = new ArrayList<>();
+    }
+    
+    SolrParams reqParams = request.getParams();
+    if (reqParams == null) {
+      reqParams = new ModifiableSolrParams();
+    }
+    List<String> theUrlList = new ArrayList<>();
+    if (request.getPath().equals("/admin/collections")
+        || request.getPath().equals("/admin/cores")) {
+      Set<String> liveNodes = clusterState.getLiveNodes();
+      for (String liveNode : liveNodes) {
+        theUrlList.add(zkStateReader.getBaseUrlForNodeName(liveNode));
+      }
+    } else {
+      String collection = reqParams.get(UpdateParams.COLLECTION, defaultCollection);
+      
+      if (collection == null) {
+        throw new SolrServerException(
+            "No collection param specified on request and no default collection has been set.");
+      }
+      
+      Set<String> collectionNames = getCollectionNames(clusterState, collection);
+      if (collectionNames.size() == 0) {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "Could not find collection: " + collection);
+      }
+
+      String shardKeys =  reqParams.get(ShardParams._ROUTE_);
+
+      // TODO: not a big deal because of the caching, but we could avoid looking
+      // at every shard
+      // when getting leaders if we tweaked some things
+      
+      // Retrieve slices from the cloud state and, for each collection
+      // specified,
+      // add it to the Map of slices.
+      Map<String,Slice> slices = new HashMap<>();
+      for (String collectionName : collectionNames) {
+        DocCollection col = getDocCollection(clusterState, collectionName, null);
+        Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
+        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
+      }
+      Set<String> liveNodes = clusterState.getLiveNodes();
+
+      List<String> leaderUrlList = null;
+      List<String> urlList = null;
+      List<String> replicasList = null;
+      
+      // build a map of unique nodes
+      // TODO: allow filtering by group, role, etc
+      Map<String,ZkNodeProps> nodes = new HashMap<>();
+      List<String> urlList2 = new ArrayList<>();
+      for (Slice slice : slices.values()) {
+        for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
+          ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+          String node = coreNodeProps.getNodeName();
+          if (!liveNodes.contains(coreNodeProps.getNodeName())
+              || !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
+          if (nodes.put(node, nodeProps) == null) {
+            if (!sendToLeaders || (sendToLeaders && coreNodeProps.isLeader())) {
+              String url;
+              if (reqParams.get(UpdateParams.COLLECTION) == null) {
+                url = ZkCoreNodeProps.getCoreUrl(
+                    nodeProps.getStr(ZkStateReader.BASE_URL_PROP),
+                    defaultCollection);
+              } else {
+                url = coreNodeProps.getCoreUrl();
+              }
+              urlList2.add(url);
+            } else if (sendToLeaders) {
+              String url;
+              if (reqParams.get(UpdateParams.COLLECTION) == null) {
+                url = ZkCoreNodeProps.getCoreUrl(
+                    nodeProps.getStr(ZkStateReader.BASE_URL_PROP),
+                    defaultCollection);
+              } else {
+                url = coreNodeProps.getCoreUrl();
+              }
+              replicas.add(url);
+            }
+          }
+        }
+      }
+      
+      if (sendToLeaders) {
+        leaderUrlList = urlList2;
+        replicasList = replicas;
+      } else {
+        urlList = urlList2;
+      }
+      
+      if (sendToLeaders) {
+        theUrlList = new ArrayList<>(leaderUrlList.size());
+        theUrlList.addAll(leaderUrlList);
+      } else {
+        theUrlList = new ArrayList<>(urlList.size());
+        theUrlList.addAll(urlList);
+      }
+      if(theUrlList.isEmpty()) {
+        for (String s : collectionNames) {
+          if(s!=null) collectionStateCache.remove(s);
+        }
+        throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Not enough nodes to handle the request");
+      }
+
+      Collections.shuffle(theUrlList, rand);
+      if (sendToLeaders) {
+        ArrayList<String> theReplicas = new ArrayList<>(
+            replicasList.size());
+        theReplicas.addAll(replicasList);
+        Collections.shuffle(theReplicas, rand);
+        theUrlList.addAll(theReplicas);
+      }
+      
+    }
+    
+    LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(request, theUrlList);
+    LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+    return rsp.getResponse();
+  }
+
+  private Set<String> getCollectionNames(ClusterState clusterState,
+                                         String collection) {
+    // Extract each comma separated collection name and store in a List.
+    List<String> rawCollectionsList = StrUtils.splitSmart(collection, ",", true);
+    Set<String> collectionNames = new HashSet<>();
+    // validate collections
+    for (String collectionName : rawCollectionsList) {
+      if (!clusterState.getCollections().contains(collectionName)) {
+        Aliases aliases = zkStateReader.getAliases();
+        String alias = aliases.getCollectionAlias(collectionName);
+        if (alias != null) {
+          List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
+          collectionNames.addAll(aliasList);
+          continue;
+        }
+
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+        }
+
+      collectionNames.add(collectionName);
+    }
+    return collectionNames;
+  }
+
+  @Override
+  public void close() throws IOException {
+    shutdown();
+  }
+
+  @Override
+  @Deprecated
+  public void shutdown() {
+    if (zkStateReader != null) {
+      synchronized(this) {
+        if (zkStateReader!= null)
+          zkStateReader.close();
+        zkStateReader = null;
+      }
+    }
+    
+    if (shutdownLBHttpSolrServer) {
+      lbClient.shutdown();
+    }
+    
+    if (clientIsInternal && myClient!=null) {
+      HttpClientUtil.close(myClient);
+    }
+
+    if(this.threadPool != null && !this.threadPool.isShutdown()) {
+      this.threadPool.shutdown();
+    }
+  }
+
+  public LBHttpSolrClient getLbClient() {
+    return lbClient;
+  }
+  
+  public boolean isUpdatesToLeaders() {
+    return updatesToLeaders;
+  }
+
+  /**If caches are expired they are refreshed after acquiring a lock.
+   * use this to set the number of locks
+   */
+  public void setParallelCacheRefreshes(int n){ locks = objectList(n); }
+
+  private static ArrayList<Object> objectList(int n) {
+    ArrayList<Object> l =  new ArrayList<>(n);
+    for(int i=0;i<n;i++) l.add(new Object());
+    return l;
+  }
+
+
+  protected DocCollection getDocCollection(ClusterState clusterState, String collection, Integer expectedVersion) throws SolrException {
+    if (collection == null) return null;
+    DocCollection col = getFromCache(collection);
+    if (col != null) {
+      if (expectedVersion == null) return col;
+      if (expectedVersion.intValue() == col.getZNodeVersion()) return col;
+    }
+
+    ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection);
+    if (ref == null) {
+      //no such collection exists
+      return null;
+    }
+    if (!ref.isLazilyLoaded()) {
+      //it is readily available just return it
+      return ref.get();
+    }
+    List locks = this.locks;
+    final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size()));
+    synchronized (lock) {
+      //we have waited for sometime just check once again
+      col = getFromCache(collection);
+      if (col != null) {
+        if (expectedVersion == null) return col;
+        if (expectedVersion.intValue() == col.getZNodeVersion()) {
+          return col;
+        } else {
+          collectionStateCache.remove(collection);
+        }
+      }
+      col = ref.get();//this is a call to ZK
+    }
+    if (col == null) return null;
+    if (col.getStateFormat() > 1) collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
+    return col;
+  }
+
+  private DocCollection getFromCache(String c){
+    ExpiringCachedDocCollection cachedState = collectionStateCache.get(c);
+    return cachedState != null ? cachedState.cached : null;
+  }
+
+
+  /**
+   * Useful for determining the minimum achieved replication factor across
+   * all shards involved in processing an update request, typically useful
+   * for gauging the replication factor of a batch. 
+   */
+  @SuppressWarnings("rawtypes")
+  public int getMinAchievedReplicationFactor(String collection, NamedList resp) {
+    // it's probably already on the top-level header set by condense
+    NamedList header = (NamedList)resp.get("responseHeader");
+    Integer achRf = (Integer)header.get(UpdateRequest.REPFACT);
+    if (achRf != null)
+      return achRf.intValue();
+
+    // not on the top-level header, walk the shard route tree
+    Map<String,Integer> shardRf = getShardReplicationFactor(collection, resp);
+    for (Integer rf : shardRf.values()) {
+      if (achRf == null || rf < achRf) {
+        achRf = rf;
+      }
+    }    
+    return (achRf != null) ? achRf.intValue() : -1;
+  }
+  
+  /**
+   * Walks the NamedList response after performing an update request looking for
+   * the replication factor that was achieved in each shard involved in the request.
+   * For single doc updates, there will be only one shard in the return value. 
+   */
+  @SuppressWarnings("rawtypes")
+  public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) {
+    connect();
+    
+    Map<String,Integer> results = new HashMap<String,Integer>();
+    if (resp instanceof CloudSolrClient.RouteResponse) {
+      NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses();
+      ClusterState clusterState = zkStateReader.getClusterState();     
+      Map<String,String> leaders = new HashMap<String,String>();
+      for (Slice slice : clusterState.getActiveSlices(collection)) {
+        Replica leader = slice.getLeader();
+        if (leader != null) {
+          ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+          String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName();
+          leaders.put(leaderUrl, slice.getName());
+          String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection;
+          leaders.put(altLeaderUrl, slice.getName());
+        }
+      }
+      
+      Iterator<Map.Entry<String,Object>> routeIter = routes.iterator();
+      while (routeIter.hasNext()) {
+        Map.Entry<String,Object> next = routeIter.next();
+        String host = next.getKey();
+        NamedList hostResp = (NamedList)next.getValue();
+        Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
+        if (rf != null) {
+          String shard = leaders.get(host);
+          if (shard == null) {
+            if (host.endsWith("/"))
+              shard = leaders.get(host.substring(0,host.length()-1));
+            if (shard == null) {
+              shard = host;
+            }
+          }
+          results.put(shard, rf);
+        }
+      }
+    }    
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
new file mode 100644
index 0000000..4e2a2e7
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import org.apache.http.client.HttpClient;
+
+import java.util.Collection;
+
+/**
+ * @deprecated Use {@link org.apache.solr.client.solrj.impl.CloudSolrClient}
+ */
+@Deprecated
+public class CloudSolrServer extends CloudSolrClient {
+
+  public CloudSolrServer(String zkHost) {
+    super(zkHost);
+  }
+
+  public CloudSolrServer(String zkHost, HttpClient httpClient) {
+    super(zkHost, httpClient);
+  }
+
+  public CloudSolrServer(Collection<String> zkHosts, String chroot) {
+    super(zkHosts, chroot);
+  }
+
+  public CloudSolrServer(Collection<String> zkHosts, String chroot, HttpClient httpClient) {
+    super(zkHosts, chroot, httpClient);
+  }
+
+  public CloudSolrServer(String zkHost, boolean updatesToLeaders) {
+    super(zkHost, updatesToLeaders);
+  }
+
+  public CloudSolrServer(String zkHost, boolean updatesToLeaders, HttpClient httpClient) {
+    super(zkHost, updatesToLeaders, httpClient);
+  }
+
+  public CloudSolrServer(String zkHost, LBHttpSolrClient lbClient) {
+    super(zkHost, lbClient);
+  }
+
+  public CloudSolrServer(String zkHost, LBHttpSolrClient lbClient, boolean updatesToLeaders) {
+    super(zkHost, lbClient, updatesToLeaders);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
new file mode 100644
index 0000000..5e65021
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentProducer;
+import org.apache.http.entity.EntityTemplate;
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * ConcurrentUpdateSolrClient buffers all added documents and writes
+ * them into open HTTP connections. This class is thread safe.
+ * 
+ * Params from {@link UpdateRequest} are converted to http request
+ * parameters. When params change between UpdateRequests a new HTTP
+ * request is started.
+ * 
+ * Although any SolrClient request can be made with this implementation, it is
+ * only recommended to use ConcurrentUpdateSolrClient with /update
+ * requests. The class {@link HttpSolrClient} is better suited for the
+ * query interface.
+ */
+public class ConcurrentUpdateSolrClient extends SolrClient {
+  private static final long serialVersionUID = 1L;
+  static final Logger log = LoggerFactory
+      .getLogger(ConcurrentUpdateSolrClient.class);
+  private HttpSolrClient client;
+  final BlockingQueue<UpdateRequest> queue;
+  final ExecutorService scheduler;
+  final Queue<Runner> runners;
+  volatile CountDownLatch lock = null; // used to block everything
+  final int threadCount;
+  boolean shutdownExecutor = false;
+  int pollQueueTime = 250;
+  private final boolean streamDeletes;
+
+  /**
+   * Uses an internally managed HttpClient instance.
+   * 
+   * @param solrServerUrl
+   *          The Solr server URL
+   * @param queueSize
+   *          The buffer size before the documents are sent to the server
+   * @param threadCount
+   *          The number of background threads used to empty the queue
+   */
+  public ConcurrentUpdateSolrClient(String solrServerUrl, int queueSize,
+                                    int threadCount) {
+    this(solrServerUrl, null, queueSize, threadCount);
+    shutdownExecutor = true;
+  }
+  
+  public ConcurrentUpdateSolrClient(String solrServerUrl,
+                                    HttpClient client, int queueSize, int threadCount) {
+    this(solrServerUrl, client, queueSize, threadCount, Executors.newCachedThreadPool(
+        new SolrjNamedThreadFactory("concurrentUpdateScheduler")));
+    shutdownExecutor = true;
+  }
+
+  /**
+   * Uses the supplied HttpClient to send documents to the Solr server.
+   */
+  public ConcurrentUpdateSolrClient(String solrServerUrl,
+                                    HttpClient client, int queueSize, int threadCount, ExecutorService es) {
+    this(solrServerUrl, client, queueSize, threadCount, es, false);
+  }
+  
+  /**
+   * Uses the supplied HttpClient to send documents to the Solr server.
+   */
+  public ConcurrentUpdateSolrClient(String solrServerUrl,
+                                    HttpClient client, int queueSize, int threadCount, ExecutorService es, boolean streamDeletes) {
+    this.client = new HttpSolrClient(solrServerUrl, client);
+    this.client.setFollowRedirects(false);
+    queue = new LinkedBlockingQueue<>(queueSize);
+    this.threadCount = threadCount;
+    runners = new LinkedList<>();
+    scheduler = es;
+    this.streamDeletes = streamDeletes;
+  }
+
+  public Set<String> getQueryParams() {
+    return this.client.getQueryParams();
+  }
+
+  /**
+   * Expert Method.
+   * @param queryParams set of param keys to only send via the query string
+   */
+  public void setQueryParams(Set<String> queryParams) {
+    this.client.setQueryParams(queryParams);
+  }
+  
+  /**
+   * Opens a connection and sends everything...
+   */
+  class Runner implements Runnable {
+    final Lock runnerLock = new ReentrantLock();
+
+    @Override
+    public void run() {
+      runnerLock.lock();
+
+      log.debug("starting runner: {}", this);
+      HttpPost method = null;
+      HttpResponse response = null;            
+      try {
+        while (!queue.isEmpty()) {
+          try {
+            final UpdateRequest updateRequest = 
+                queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+            if (updateRequest == null)
+              break;
+                       
+            String contentType = client.requestWriter.getUpdateContentType();
+            final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
+
+            final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
+
+            EntityTemplate template = new EntityTemplate(new ContentProducer() {
+
+              @Override
+              public void writeTo(OutputStream out) throws IOException {
+                try {
+                  if (isXml) {
+                    out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
+                  }                                    
+                  UpdateRequest req = updateRequest;
+                  while (req != null) {                                        
+                    SolrParams currentParams = new ModifiableSolrParams(req.getParams());
+                    if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
+                      queue.add(req); // params are different, push back to queue
+                      break;
+                    }
+                    
+                    client.requestWriter.write(req, out);
+                    if (isXml) {
+                      // check for commit or optimize
+                      SolrParams params = req.getParams();
+                      if (params != null) {
+                        String fmt = null;
+                        if (params.getBool(UpdateParams.OPTIMIZE, false)) {
+                          fmt = "<optimize waitSearcher=\"%s\" />";
+                        } else if (params.getBool(UpdateParams.COMMIT, false)) {
+                          fmt = "<commit waitSearcher=\"%s\" />";
+                        }
+                        if (fmt != null) {
+                          byte[] content = String.format(Locale.ROOT,
+                              fmt,
+                              params.getBool(UpdateParams.WAIT_SEARCHER, false)
+                                  + "").getBytes(StandardCharsets.UTF_8);
+                          out.write(content);
+                        }
+                      }
+                    }
+                    out.flush();
+                    req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+                  }
+                  
+                  if (isXml) {
+                    out.write("</stream>".getBytes(StandardCharsets.UTF_8));
+                  }
+
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  log.warn("", e);
+                }
+              }
+            });
+            
+            // The parser 'wt=' and 'version=' params are used instead of the
+            // original params
+            ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
+            requestParams.set(CommonParams.WT, client.parser.getWriterType());
+            requestParams.set(CommonParams.VERSION, client.parser.getVersion());
+
+            method = new HttpPost(client.getBaseURL() + "/update"
+                + ClientUtils.toQueryString(requestParams, false));
+            method.setEntity(template);
+            method.addHeader("User-Agent", HttpSolrClient.AGENT);
+            method.addHeader("Content-Type", contentType);
+                        
+            response = client.getHttpClient().execute(method);
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode != HttpStatus.SC_OK) {
+              StringBuilder msg = new StringBuilder();
+              msg.append(response.getStatusLine().getReasonPhrase());
+              msg.append("\n\n\n\n");
+              msg.append("request: ").append(method.getURI());
+
+              SolrException solrExc = new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString());
+              // parse out the metadata from the SolrException
+              try {
+                NamedList<Object> resp =
+                    client.parser.processResponse(response.getEntity().getContent(),
+                        response.getEntity().getContentType().getValue());
+                NamedList<Object> error = (NamedList<Object>) resp.get("error");
+                if (error != null)
+                  solrExc.setMetadata((NamedList<String>) error.get("metadata"));
+              } catch (Exception exc) {
+                // don't want to fail to report error if parsing the response fails
+                log.warn("Failed to parse error response from "+ client.getBaseURL()+" due to: "+exc);
+              }
+
+              handleError(solrExc);
+            } else {
+              onSuccess(response);
+            }
+          } finally {
+            try {
+              if (response != null) {
+                response.getEntity().getContent().close();
+              }
+            } catch (Exception ex) {
+              log.warn("", ex);
+            }
+          }
+        }
+      } catch (Throwable e) {
+        if (e instanceof OutOfMemoryError) {
+          throw (OutOfMemoryError) e;
+        }
+        handleError(e);
+      } finally {
+        synchronized (runners) {
+          if (runners.size() == 1 && !queue.isEmpty()) {
+            // keep this runner alive
+            scheduler.execute(this);
+          } else {
+            runners.remove(this);
+            if (runners.isEmpty())
+              runners.notifyAll();
+          }
+        }
+
+        log.debug("finished: {}", this);
+        runnerLock.unlock();
+      }
+    }
+  }
+
+  @Override
+  public NamedList<Object> request(final SolrRequest request)
+      throws SolrServerException, IOException {
+    if (!(request instanceof UpdateRequest)) {
+      return client.request(request);
+    }
+    UpdateRequest req = (UpdateRequest) request;
+
+    // this happens for commit...
+    if (streamDeletes) {
+      if ((req.getDocuments() == null || req.getDocuments().isEmpty())
+          && (req.getDeleteById() == null || req.getDeleteById().isEmpty())
+          && (req.getDeleteByIdMap() == null || req.getDeleteByIdMap().isEmpty())) {
+        if (req.getDeleteQuery() == null) {
+          blockUntilFinished();
+          return client.request(request);
+        }
+      }
+    } else {
+      if ((req.getDocuments() == null || req.getDocuments().isEmpty())) {
+        blockUntilFinished();
+        return client.request(request);
+      }
+    }
+
+
+    SolrParams params = req.getParams();
+    if (params != null) {
+      // check if it is waiting for the searcher
+      if (params.getBool(UpdateParams.WAIT_SEARCHER, false)) {
+        log.info("blocking for commit/optimize");
+        blockUntilFinished(); // empty the queue
+        return client.request(request);
+      }
+    }
+
+    try {
+      CountDownLatch tmpLock = lock;
+      if (tmpLock != null) {
+        tmpLock.await();
+      }
+
+      boolean success = queue.offer(req);
+
+      for (;;) {
+        synchronized (runners) {
+          // see if queue is half full and we can add more runners
+          // special case: if only using a threadCount of 1 and the queue
+          // is filling up, allow 1 add'l runner to help process the queue
+          if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() && runners.size() < threadCount))
+          {
+            // We need more runners, so start a new one.
+            Runner r = new Runner();
+            runners.add(r);
+            scheduler.execute(r);
+          } else {
+            // break out of the retry loop if we added the element to the queue
+            // successfully, *and*
+            // while we are still holding the runners lock to prevent race
+            // conditions.
+            if (success)
+              break;
+          }
+        }
+
+        // Retry to add to the queue w/o the runners lock held (else we risk
+        // temporary deadlock)
+        // This retry could also fail because
+        // 1) existing runners were not able to take off any new elements in the
+        // queue
+        // 2) the queue was filled back up since our last try
+        // If we succeed, the queue may have been completely emptied, and all
+        // runners stopped.
+        // In all cases, we should loop back to the top to see if we need to
+        // start more runners.
+        //
+        if (!success) {
+          success = queue.offer(req, 100, TimeUnit.MILLISECONDS);
+        }
+      }
+    } catch (InterruptedException e) {
+      log.error("interrupted", e);
+      throw new IOException(e.getLocalizedMessage());
+    }
+
+    // RETURN A DUMMY result
+    NamedList<Object> dummy = new NamedList<>();
+    dummy.add("NOTE", "the request is processed in a background stream");
+    return dummy;
+  }
+
+  public synchronized void blockUntilFinished() {
+    lock = new CountDownLatch(1);
+    try {
+      synchronized (runners) {
+        while (!runners.isEmpty()) {
+          try {
+            runners.wait();
+          } catch (InterruptedException e) {
+            Thread.interrupted();
+          }
+          
+          if (scheduler.isTerminated())
+            break;
+                      
+          // if we reach here, then we probably got the notifyAll, but need to check if
+          // the queue is empty before really considering this is finished (SOLR-4260)
+          int queueSize = queue.size();
+          if (queueSize > 0) {
+            log.warn("No more runners, but queue still has "+
+              queueSize+" adding more runners to process remaining requests on queue");
+            Runner r = new Runner();
+            runners.add(r);
+            scheduler.execute(r);
+          }
+        }
+      }
+    } finally {
+      lock.countDown();
+      lock = null;
+    }
+  }
+
+  public void handleError(Throwable ex) {
+    log.error("error", ex);
+  }
+  
+  /**
+   * Intended to be used as an extension point for doing post processing after a request completes.
+   */
+  public void onSuccess(HttpResponse resp) {
+    // no-op by design, override to add functionality
+  }
+
+  @Override
+  public void close() {
+    shutdown();
+  }
+
+  @Override
+  @Deprecated
+  public void shutdown() {
+    client.shutdown();
+    if (shutdownExecutor) {
+      scheduler.shutdown();
+      try {
+        if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+          scheduler.shutdownNow();
+          if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+              .error("ExecutorService did not terminate");
+        }
+      } catch (InterruptedException ie) {
+        scheduler.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+  
+  public void setConnectionTimeout(int timeout) {
+    HttpClientUtil.setConnectionTimeout(client.getHttpClient(), timeout);
+  }
+
+  /**
+   * set soTimeout (read timeout) on the underlying HttpConnectionManager. This is desirable for queries, but probably
+   * not for indexing.
+   */
+  public void setSoTimeout(int timeout) {
+    HttpClientUtil.setSoTimeout(client.getHttpClient(), timeout);
+  }
+
+  public void shutdownNow() {
+    client.shutdown();
+    if (shutdownExecutor) {
+      scheduler.shutdownNow(); // Cancel currently executing tasks
+      try {
+        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) 
+          log.error("ExecutorService did not terminate");
+      } catch (InterruptedException ie) {
+        scheduler.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }    
+  }
+  
+  public void setParser(ResponseParser responseParser) {
+    client.setParser(responseParser);
+  }
+  
+  
+  /**
+   * @param pollQueueTime time for an open connection to wait for updates when
+   * the queue is empty. 
+   */
+  public void setPollQueueTime(int pollQueueTime) {
+    this.pollQueueTime = pollQueueTime;
+  }
+
+  public void setRequestWriter(RequestWriter requestWriter) {
+    client.setRequestWriter(requestWriter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
new file mode 100644
index 0000000..9ace82a
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import org.apache.http.client.HttpClient;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @deprecated Use {@link org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient}
+ */
+@Deprecated
+public class ConcurrentUpdateSolrServer extends ConcurrentUpdateSolrClient {
+
+  public ConcurrentUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount) {
+    super(solrServerUrl, queueSize, threadCount);
+  }
+
+  public ConcurrentUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount) {
+    super(solrServerUrl, client, queueSize, threadCount);
+  }
+
+  public ConcurrentUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount, ExecutorService es) {
+    super(solrServerUrl, client, queueSize, threadCount, es);
+  }
+
+  public ConcurrentUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount, ExecutorService es, boolean streamDeletes) {
+    super(solrServerUrl, client, queueSize, threadCount, es, streamDeletes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientConfigurer.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientConfigurer.java b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientConfigurer.java
new file mode 100644
index 0000000..1d370ff
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/client/solrj/impl/HttpClientConfigurer.java
@@ -0,0 +1,97 @@
+package org.apache.solr.client.solrj.impl;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.solr.common.params.SolrParams;
+
+/**
+ * The default http client configurer. If the behaviour needs to be customized a
+ * new HttpCilentConfigurer can be set by calling
+ * {@link HttpClientUtil#setConfigurer(HttpClientConfigurer)}
+ */
+public class HttpClientConfigurer {
+  
+  public void configure(DefaultHttpClient httpClient, SolrParams config) {
+    
+    if (config.get(HttpClientUtil.PROP_MAX_CONNECTIONS) != null) {
+      HttpClientUtil.setMaxConnections(httpClient,
+          config.getInt(HttpClientUtil.PROP_MAX_CONNECTIONS));
+    }
+    
+    if (config.get(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST) != null) {
+      HttpClientUtil.setMaxConnectionsPerHost(httpClient,
+          config.getInt(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST));
+    }
+    
+    if (config.get(HttpClientUtil.PROP_CONNECTION_TIMEOUT) != null) {
+      HttpClientUtil.setConnectionTimeout(httpClient,
+          config.getInt(HttpClientUtil.PROP_CONNECTION_TIMEOUT));
+    }
+    
+    if (config.get(HttpClientUtil.PROP_SO_TIMEOUT) != null) {
+      HttpClientUtil.setSoTimeout(httpClient,
+          config.getInt(HttpClientUtil.PROP_SO_TIMEOUT));
+    }
+    
+    if (config.get(HttpClientUtil.PROP_FOLLOW_REDIRECTS) != null) {
+      HttpClientUtil.setFollowRedirects(httpClient,
+          config.getBool(HttpClientUtil.PROP_FOLLOW_REDIRECTS));
+    }
+    
+    // always call setUseRetry, whether it is in config or not
+    HttpClientUtil.setUseRetry(httpClient,
+        config.getBool(HttpClientUtil.PROP_USE_RETRY, true));
+    
+    final String basicAuthUser = config
+        .get(HttpClientUtil.PROP_BASIC_AUTH_USER);
+    final String basicAuthPass = config
+        .get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
+    HttpClientUtil.setBasicAuth(httpClient, basicAuthUser, basicAuthPass);
+    
+    if (config.get(HttpClientUtil.PROP_ALLOW_COMPRESSION) != null) {
+      HttpClientUtil.setAllowCompression(httpClient,
+          config.getBool(HttpClientUtil.PROP_ALLOW_COMPRESSION));
+    }
+    
+    boolean sslCheckPeerName = toBooleanDefaultIfNull(
+        toBooleanObject(System.getProperty(HttpClientUtil.SYS_PROP_CHECK_PEER_NAME)), true);
+    if(sslCheckPeerName == false) {
+      HttpClientUtil.setHostNameVerifier(httpClient, SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+    }
+  }
+  
+  public static boolean toBooleanDefaultIfNull(Boolean bool, boolean valueIfNull) {
+    if (bool == null) {
+      return valueIfNull;
+    }
+    return bool.booleanValue() ? true : false;
+  }
+  
+  public static Boolean toBooleanObject(String str) {
+    if ("true".equalsIgnoreCase(str)) {
+      return Boolean.TRUE;
+    } else if ("false".equalsIgnoreCase(str)) {
+      return Boolean.FALSE;
+    }
+    // no match
+    return null;
+  }
+}


Mime
View raw message