lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject lucene-solr:jira/http2: SOLR-12605: SolrCmdDistributor uses Http2SolrClient
Date Tue, 31 Jul 2018 02:31:22 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/jira/http2 5bac295be -> 7a045d16e


SOLR-12605: SolrCmdDistributor uses Http2SolrClient


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7a045d16
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7a045d16
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7a045d16

Branch: refs/heads/jira/http2
Commit: 7a045d16e8ac8f2f47e714e5c69d68472a3609ba
Parents: 5bac295
Author: Cao Manh Dat <datcm@apache.org>
Authored: Tue Jul 31 09:31:07 2018 +0700
Committer: Cao Manh Dat <datcm@apache.org>
Committed: Tue Jul 31 09:31:07 2018 +0700

----------------------------------------------------------------------
 .../apache/solr/update/SolrCmdDistributor.java  | 350 +++++++------------
 .../solr/update/StreamingSolrClients.java       | 175 ----------
 .../apache/solr/update/UpdateShardHandler.java  |  15 +-
 .../processor/DistributedUpdateProcessor.java   |  49 +--
 .../TimeRoutedAliasUpdateProcessor.java         |   8 +-
 .../processor/TolerantUpdateProcessor.java      |   6 +-
 .../solr/update/MockStreamingSolrClients.java   |  87 -----
 .../solr/update/MockingHttp2SolrClient.java     |  95 +++++
 .../solr/update/SolrCmdDistributorTest.java     |  66 ++--
 .../apache/solr/client/solrj/SolrRequest.java   |  11 +
 .../solr/client/solrj/impl/Http2SolrClient.java |  78 +++--
 .../client/solrj/impl/Http2SolrClientTest.java  |   4 +-
 .../solr/BaseDistributedSearchTestCase.java     |   9 +-
 .../java/org/apache/solr/SolrTestCaseJ4.java    |   2 +-
 .../org/apache/solr/util/SSLTestConfig.java     |  15 +
 15 files changed, 378 insertions(+), 592 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 80e2253..7437042 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -17,12 +17,15 @@
 package org.apache.solr.update;
 
 
-import org.apache.http.HttpResponse;
-import org.apache.solr.client.solrj.SolrClient;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Phaser;
+
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.BinaryResponseParser;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; // jdoc
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
@@ -32,150 +35,36 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.Diagnostics;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
 import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-
 /**
  * Used for distributing commands from a shard leader to its replicas.
  */
-public class SolrCmdDistributor implements Closeable {
+public class SolrCmdDistributor {
   private static final int MAX_RETRIES_ON_FORWARD = 25;
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
-  private StreamingSolrClients clients;
-  private boolean finished = false; // see finish()
 
-  private int retryPause = 500;
-  private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
-  
   private final List<Error> allErrors = new ArrayList<>();
-  private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
-  
-  private final CompletionService<Object> completionService;
-  private final Set<Future<Object>> pending = new HashSet<>();
-  
-  public static interface AbortCheck {
-    public boolean abortCheck();
-  }
-  
+  private Http2SolrClient client;
+  private Phaser pendingTasksPhaser = new Phaser(1);
+  private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
+
   public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
-    this.clients = new StreamingSolrClients(updateShardHandler);
-    this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor());
+    this.client = updateShardHandler.getUpdateOnlyHttpClient();
   }
   
-  public SolrCmdDistributor(StreamingSolrClients clients, int maxRetriesOnForward, int retryPause) {
-    this.clients = clients;
+  public SolrCmdDistributor(Http2SolrClient client, int maxRetriesOnForward) {
+    this.client = client;
     this.maxRetriesOnForward = maxRetriesOnForward;
-    this.retryPause = retryPause;
-    completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor());
-  }
-  
-  public void finish() {    
-    try {
-      assert ! finished : "lifecycle sanity check";
-      finished = true;
-      
-      blockAndDoRetries();
-    } finally {
-      clients.shutdown();
-    }
   }
   
-  public void close() {
-    clients.shutdown();
+  public void finish() {
+    blockUntilFinished();
   }
 
-  private void doRetriesIfNeeded() {
-    // NOTE: retries will be forwards to a single url
-    
-    List<Error> errors = new ArrayList<>(this.errors);
-    errors.addAll(clients.getErrors());
-    List<Error> resubmitList = new ArrayList<>();
-
-    for (Error err : errors) {
-      try {
-        String oldNodeUrl = err.req.node.getUrl();
-        
-        // if there is a retry url, we want to retry...
-        boolean isRetry = err.req.node.checkRetry();
-        
-        boolean doRetry = false;
-        int rspCode = err.statusCode;
-        
-        if (testing_errorHook != null) Diagnostics.call(testing_errorHook,
-            err.e);
-        
-        // this can happen in certain situations such as close
-        if (isRetry) {
-          if (rspCode == 404 || rspCode == 403 || rspCode == 503) {
-            doRetry = true;
-          }
-          
-          // if it's a connect exception, lets try again
-          if (err.e instanceof SolrServerException) {
-            if (((SolrServerException) err.e).getRootCause() instanceof ConnectException) {
-              doRetry = true;
-            }
-          }
-          
-          if (err.e instanceof ConnectException) {
-            doRetry = true;
-          }
-          
-          if (err.req.retries < maxRetriesOnForward && doRetry) {
-            err.req.retries++;
-            
-            SolrException.log(SolrCmdDistributor.log, "forwarding update to "
-                + oldNodeUrl + " failed - retrying ... retries: "
-                + err.req.retries + " " + err.req.cmd.toString() + " params:"
-                + err.req.uReq.getParams() + " rsp:" + rspCode, err.e);
-            try {
-              Thread.sleep(retryPause);
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              log.warn(null, e);
-            }
-            
-            resubmitList.add(err);
-          } else {
-            allErrors.add(err);
-          }
-        } else {
-          allErrors.add(err);
-        }
-      } catch (Exception e) {
-        // continue on
-        log.error("Unexpected Error while doing request retries", e);
-      }
-    }
-    
-    clients.clearErrors();
-    this.errors.clear();
-    for (Error err : resubmitList) {
-      submit(err.req, false);
-    }
-    
-    if (resubmitList.size() > 0) {
-      blockAndDoRetries();
-    }
-  }
-  
   public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
     distribDelete(cmd, nodes, params, false, null, null);
   }
@@ -227,7 +116,7 @@ public class SolrCmdDistributor implements Closeable {
       ModifiableSolrParams params) throws IOException {
     
     // we need to do any retries before commit...
-    blockAndDoRetries();
+    blockUntilFinished();
     
     UpdateRequest uReq = new UpdateRequest();
     uReq.setParams(params);
@@ -242,23 +131,8 @@ public class SolrCmdDistributor implements Closeable {
     
   }
 
-  public void blockAndDoRetries() {
-    clients.blockUntilFinished();
-    
-    // wait for any async commits to complete
-    while (pending != null && pending.size() > 0) {
-      Future<Object> future = null;
-      try {
-        future = completionService.take();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.error("blockAndDoRetries interrupted", e);
-      }
-      if (future == null) break;
-      pending.remove(future);
-    }
-    doRetriesIfNeeded();
-
+  public void blockUntilFinished() {
+    pendingTasksPhaser.arriveAndAwaitAdvance();
   }
   
   void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
@@ -268,68 +142,115 @@ public class SolrCmdDistributor implements Closeable {
   }
 
   private void submit(final Req req, boolean isCommit) {
-    if (req.synchronous) {
-      blockAndDoRetries();
+    pendingTasksPhaser.register();
+    submit0(req, isCommit);
+  }
+
+  private void submit0(final Req req, boolean isCommit) {
 
-      try (HttpSolrClient client = new HttpSolrClient.Builder(req.node.getUrl()).withHttpClient(clients.getHttpClient()).build()) {
-        client.request(req.uReq);
-      } catch (Exception e) {
-        SolrException.log(log, e);
-        Error error = new Error();
-        error.e = e;
-        error.req = req;
-        if (e instanceof SolrException) {
-          error.statusCode = ((SolrException) e).code();
-        }
-        errors.add(error);
-      }
-      
-      return;
-    }
-    
     if (log.isDebugEnabled()) {
       log.debug("sending update to "
           + req.node.getUrl() + " retry:"
           + req.retries + " " + req.cmd + " params:" + req.uReq.getParams());
     }
-    
-    if (isCommit) {
-      // a commit using ConncurrentUpdateSolrServer is not async,
-      // so we make it async to prevent commits from happening
-      // serially across multiple nodes
-      pending.add(completionService.submit(() -> {
-        doRequest(req);
-        return null;
-      }));
+
+    try {
+      req.uReq.setBasePath(req.node.getUrl());
+      if (req.synchronous) {
+        NamedList rsp = client.request(req.uReq);
+        req.trackRequestResult(rsp, true);
+        pendingTasksPhaser.arriveAndDeregister();
+      } else {
+        //TODO write add cmds in single outputstream
+        client.request(req.uReq, null, new Http2SolrClient.OnComplete<NamedList>() {
+          @Override
+          public void onSuccess(NamedList result) {
+            req.trackRequestResult(result, true);
+            pendingTasksPhaser.arriveAndDeregister();
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            handleAndRetry(req, t, isCommit);
+          }
+        });
+      }
+    } catch (Exception e) {
+      handleAndRetry(req, e, isCommit);
+    }
+  }
+
+  private void handleAndRetry(Req req, Throwable t, boolean isCommit) {
+    SolrException.log(log, t);
+    Error error = new Error();
+    error.t = t;
+    error.req = req;
+    if (t instanceof SolrException) {
+      error.statusCode = ((SolrException) t).code();
+    }
+    if (checkRetry(error)) {
+      submit0(req, isCommit);
     } else {
-      doRequest(req);
+      req.trackRequestResult(null, false);
+      allErrors.add(error);
+      pendingTasksPhaser.arriveAndDeregister();
     }
   }
-  
-  private void doRequest(final Req req) {
-    try {
-      SolrClient solrClient = clients.getSolrClient(req);
-      solrClient.request(req.uReq);
-    } catch (Exception e) {
-      SolrException.log(log, e);
-      Error error = new Error();
-      error.e = e;
-      error.req = req;
-      if (e instanceof SolrException) {
-        error.statusCode = ((SolrException) e).code();
+
+  private boolean checkRetry(Error err) {
+    String oldNodeUrl = err.req.node.getUrl();
+
+    // if there is a retry url, we want to retry...
+    boolean isRetry = err.req.node.checkRetry();
+
+    boolean doRetry = false;
+    int rspCode = err.statusCode;
+
+    if (testing_errorHook != null) Diagnostics.call(testing_errorHook,
+        err.t);
+
+    // this can happen in certain situations such as close
+    if (isRetry) {
+      if (rspCode == 404 || rspCode == 403 || rspCode == 503) {
+        doRetry = true;
       }
-      errors.add(error);
+
+      // if it's a connect exception, lets try again
+      if (err.t instanceof SolrServerException) {
+        if (((SolrServerException) err.t).getRootCause() instanceof ConnectException) {
+          doRetry = true;
+        }
+      }
+
+      if (err.t instanceof ConnectException) {
+        doRetry = true;
+      }
+
+      if (err.req.retries < maxRetriesOnForward && doRetry) {
+        err.req.retries++;
+
+        SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+            + oldNodeUrl + " failed - retrying ... retries: "
+            + err.req.retries + " " + err.req.cmd.toString() + " params:"
+            + err.req.uReq.getParams() + " rsp:" + rspCode, err.t);
+
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      return false;
     }
   }
-  
+
   public static class Req {
     public Node node;
     public UpdateRequest uReq;
     public int retries;
-    public boolean synchronous;
     public UpdateCommand cmd;
-    final private RollupRequestReplicationTracker rollupTracker;
-    final private LeaderRequestReplicationTracker leaderTracker;
+    private final boolean synchronous;
+    private final RollupRequestReplicationTracker rollupTracker;
+    private final LeaderRequestReplicationTracker leaderTracker;
 
     public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous) {
       this(cmd, node, uReq, synchronous, null, null);
@@ -363,7 +284,7 @@ public class SolrCmdDistributor implements Closeable {
     //
     // In the case of a leaderTracker and rollupTracker both being present, then we need to take care when assembling
     // the final response to check both the rollup and leader trackers on the aggrator node.
-    public void trackRequestResult(HttpResponse resp, boolean success) {
+    public void trackRequestResult(NamedList resp, boolean success) {
 
       // Returing Integer.MAX_VALUE here means there was no "rf" on the response, therefore we just need to increment
       // our achieved rf if we are a leader, i.e. have a leaderTracker.
@@ -378,31 +299,14 @@ public class SolrCmdDistributor implements Closeable {
       }
     }
 
-    private int getRfFromResponse(HttpResponse resp) {
+    private int getRfFromResponse(NamedList resp) {
       if (resp != null) {
-
-        InputStream inputStream = null;
-
-        try {
-          inputStream = resp.getEntity().getContent();
-          BinaryResponseParser brp = new BinaryResponseParser();
-          NamedList<Object> nl = brp.processResponse(inputStream, null);
-          Object hdr = nl.get("responseHeader");
-          if (hdr != null && hdr instanceof NamedList) {
-            NamedList<Object> hdrList = (NamedList<Object>) hdr;
-            Object rfObj = hdrList.get(UpdateRequest.REPFACT);
-            if (rfObj != null && rfObj instanceof Integer) {
-              return (Integer) rfObj;
-            }
-          }
-        } catch (Exception e) {
-          log.warn("Failed to parse response from " + node + " during replication factor accounting due to: " + e);
-        } finally {
-          if (inputStream != null) {
-            try {
-              inputStream.close();
-            } catch (Exception ignore) {
-            }
+        Object hdr = resp.get("responseHeader");
+        if (hdr != null && hdr instanceof NamedList) {
+          NamedList<Object> hdrList = (NamedList<Object>) hdr;
+          Object rfObj = hdrList.get(UpdateRequest.REPFACT);
+          if (rfObj != null && rfObj instanceof Integer) {
+            return (Integer) rfObj;
           }
         }
       }
@@ -418,21 +322,15 @@ public class SolrCmdDistributor implements Closeable {
   }
   
   public static class Error {
-    public Exception e;
+    public Throwable t;
     public int statusCode = -1;
 
-    /**
-     * NOTE: This is the request that happened to be executed when this error was <b>triggered</b> the error, 
-     * but because of how {@link StreamingSolrClients} uses {@link ConcurrentUpdateSolrClient} it might not 
-     * actaully be the request that <b>caused</b> the error -- multiple requests are merged &amp; processed as 
-     * a sequential batch.
-     */
     public Req req;
     
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("SolrCmdDistributor$Error: statusCode=").append(statusCode);
-      sb.append("; exception=").append(String.valueOf(e));
+      sb.append("; exception=").append(String.valueOf(t));
       sb.append("; req=").append(String.valueOf(req));
       return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
deleted file mode 100644
index eb4caec..0000000
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.update;
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
-import org.apache.solr.client.solrj.impl.BinaryResponseParser;
-import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.update.SolrCmdDistributor.Error;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamingSolrClients {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final int runnerCount = Integer.getInteger("solr.cloud.replication.runners", 1);
-  
-  private HttpClient httpClient;
-  
-  private Map<String, ConcurrentUpdateSolrClient> solrClients = new HashMap<>();
-  private List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
-
-  private ExecutorService updateExecutor;
-
-  private int socketTimeout;
-  private int connectionTimeout;
-
-  public StreamingSolrClients(UpdateShardHandler updateShardHandler) {
-    this.updateExecutor = updateShardHandler.getUpdateExecutor();
-    
-    httpClient = updateShardHandler.getUpdateOnlyHttpClient();
-    socketTimeout = updateShardHandler.getSocketTimeout();
-    connectionTimeout = updateShardHandler.getConnectionTimeout();
-  }
-
-  public List<Error> getErrors() {
-    return errors;
-  }
-  
-  public void clearErrors() {
-    errors.clear();
-  }
-
-  public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) {
-    String url = getFullUrl(req.node.getUrl());
-    ConcurrentUpdateSolrClient client = solrClients.get(url);
-    if (client == null) {
-      // NOTE: increasing to more than 1 threadCount for the client could cause updates to be reordered
-      // on a greater scale since the current behavior is to only increase the number of connections/Runners when
-      // the queue is more than half full.
-      client = new ErrorReportingConcurrentUpdateSolrClient.Builder(url, req, errors)
-          .withHttpClient(httpClient)
-          .withQueueSize(100)
-          .withThreadCount(runnerCount)
-          .withExecutorService(updateExecutor)
-          .alwaysStreamDeletes()
-          .withSocketTimeout(socketTimeout)
-          .withConnectionTimeout(connectionTimeout)
-          .build();
-      client.setPollQueueTime(Integer.MAX_VALUE); // minimize connections created
-      client.setParser(new BinaryResponseParser());
-      client.setRequestWriter(new BinaryRequestWriter());
-      Set<String> queryParams = new HashSet<>(2);
-      queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
-      queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
-      client.setQueryParams(queryParams);
-      solrClients.put(url, client);
-    }
-
-    return client;
-  }
-
-  public synchronized void blockUntilFinished() {
-    for (ConcurrentUpdateSolrClient client : solrClients.values()) {
-      client.blockUntilFinished();
-    }
-  }
-  
-  public synchronized void shutdown() {
-    for (ConcurrentUpdateSolrClient client : solrClients.values()) {
-      client.close();
-    }
-  }
-  
-  private String getFullUrl(String url) {
-    String fullUrl;
-    if (!url.startsWith("http://") && !url.startsWith("https://")) {
-      fullUrl = "http://" + url;
-    } else {
-      fullUrl = url;
-    }
-    return fullUrl;
-  }
-
-  public HttpClient getHttpClient() {
-    return httpClient;
-  }
-  
-  public ExecutorService getUpdateExecutor() {
-    return updateExecutor;
-  }
-}
-
-class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final SolrCmdDistributor.Req req;
-  private final List<Error> errors;
-  
-  public ErrorReportingConcurrentUpdateSolrClient(Builder builder) {
-    super(builder);
-    this.req = builder.req;
-    this.errors = builder.errors;
-  }
-  
-  @Override
-  public void handleError(Throwable ex) {
-    req.trackRequestResult(null, false);
-    log.error("error", ex);
-    Error error = new Error();
-    error.e = (Exception) ex;
-    if (ex instanceof SolrException) {
-      error.statusCode = ((SolrException) ex).code();
-    }
-    error.req = req;
-    errors.add(error);
-  }
-  @Override
-  public void onSuccess(HttpResponse resp) {
-    req.trackRequestResult(resp, true);
-  }
-  
-  static class Builder extends ConcurrentUpdateSolrClient.Builder {
-    protected SolrCmdDistributor.Req req;
-    protected List<Error> errors;
-    
-    public Builder(String baseSolrUrl, SolrCmdDistributor.Req req, List<Error> errors) {
-      super(baseSolrUrl);
-      this.req = req;
-      this.errors = errors;
-    }
-
-    public ErrorReportingConcurrentUpdateSolrClient build() {
-      return new ErrorReportingConcurrentUpdateSolrClient(this);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 54dfdf9..47926bc 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -28,10 +28,12 @@ import com.codahale.metrics.MetricRegistry;
 import org.apache.http.client.HttpClient;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.core.SolrInfoBean;
 import org.apache.solr.metrics.SolrMetricManager;
@@ -64,7 +66,7 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
   
   private ExecutorService recoveryExecutor;
   
-  private final CloseableHttpClient updateOnlyClient;
+  private final Http2SolrClient updateOnlyClient;
   
   private final CloseableHttpClient defaultClient;
 
@@ -109,7 +111,12 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
 
 
     httpRequestExecutor = new InstrumentedHttpRequestExecutor(metricNameStrategy);
-    updateOnlyClient = HttpClientUtil.createClient(clientParams, updateOnlyConnectionManager, false, httpRequestExecutor);
+    Http2SolrClient.Builder updateOnlyClientBuilder = new Http2SolrClient.Builder("");
+    if (cfg != null) {
+      updateOnlyClientBuilder.connectionTimeout(cfg.getDistributedConnectionTimeout())
+          .idleTimeout(cfg.getDistributedSocketTimeout());
+    }
+    updateOnlyClient = updateOnlyClientBuilder.build();
     defaultClient = HttpClientUtil.createClient(clientParams, defaultConnectionManager, false, httpRequestExecutor);
 
     // following is done only for logging complete configuration.
@@ -174,7 +181,7 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
   }
   
   // don't introduce a bug, this client is for sending updates only!
-  public HttpClient getUpdateOnlyHttpClient() {
+  public Http2SolrClient getUpdateOnlyHttpClient() {
     return updateOnlyClient;
   }
   
@@ -208,7 +215,7 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
     } catch (Exception e) {
       SolrException.log(log, e);
     } finally {
-      HttpClientUtil.close(updateOnlyClient);
+      IOUtils.closeQuietly(updateOnlyClient);
       HttpClientUtil.close(defaultClient);
       updateOnlyConnectionManager.close();
       defaultConnectionManager.close();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 8d715a6..4449ff3 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -38,7 +38,6 @@ import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrRequest.METHOD;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.GenericSolrRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.SimpleSolrResponse;
@@ -676,19 +675,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         params.set(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
       }
 
-      if (cmd.isInPlaceUpdate()) {
-        params.set(DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
-
-        // Use synchronous=true so that a new connection is used, instead
-        // of the update being streamed through an existing streaming client.
-        // When using a streaming client, the previous update
-        // and the current in-place update (that depends on the previous update), if reordered
-        // in the stream, can result in the current update being bottled up behind the previous
-        // update in the stream and can lead to degraded performance.
-        cmdDistrib.distribAdd(cmd, nodes, params, true, rollupReplicationTracker, leaderReplicationTracker);
-      } else {
-        cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker);
-      }
+      cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker);
     }
 
     // TODO: what to do when no idField?
@@ -753,9 +740,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
   @Override
   protected void doClose() {
-    if (cmdDistrib != null) {
-      cmdDistrib.close();
-    }
   }
  
   // TODO: optionally fail if n replicas are not reached...
@@ -792,7 +776,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       // for now we don't error - we assume if it was added locally, we
       // succeeded 
       if (log.isWarnEnabled()) {
-        log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e);
+        log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.t);
       }
       
       // Since it is not a forward request, for each fail, try to tell them to
@@ -811,11 +795,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       final String replicaUrl = error.req.node.getUrl();
 
       // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
-      String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null;
+      String cause = (error.t instanceof SolrException) ? ((SolrException)error.t).getMetadata("cause") : null;
       if ("LeaderChanged".equals(cause)) {
         // let's just fail this request and let the client retry? or just call processAdd again?
         log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
-            " now thinks it is the leader! Failing the request to let the client retry! "+error.e);
+            " now thinks it is the leader! Failing the request to let the client retry! "+error.t);
         errorsForClient.add(error);
         continue;
       }
@@ -871,7 +855,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName();
             // if false, then the node is probably not "live" anymore
             // and we do not need to send a recovery message
-            Throwable rootCause = SolrException.getRootCause(error.e);
+            Throwable rootCause = SolrException.getRootCause(error.t);
             if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
               log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
               replicasShouldBeInLowerTerms.add(coreNodeName);
@@ -1300,9 +1284,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
 
     NamedList<Object> rsp = null;
-    try (HttpSolrClient hsc = new HttpSolrClient.Builder(leaderUrl).
-        withHttpClient(updateShardHandler.getUpdateOnlyHttpClient()).build()) {
-      rsp = hsc.request(ur);
+    try {
+      ur.setBasePath(leaderUrl);
+      rsp = updateShardHandler.getUpdateOnlyHttpClient().request(ur);
     } catch (SolrServerException e) {
       throw new SolrException(ErrorCode.SERVER_ERROR, "Error during fetching [" + id +
           "] from leader (" + leaderUrl + "): ", e);
@@ -1641,7 +1625,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       }
 
       if (someReplicas)  {
-        cmdDistrib.blockAndDoRetries();
+        cmdDistrib.blockUntilFinished();
       }
     }
 
@@ -1924,7 +1908,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             zkController.getBaseUrl(), req.getCore().getName()));
         if (nodes != null) {
           cmdDistrib.distribCommit(cmd, nodes, params);
-          cmdDistrib.blockAndDoRetries();
+          cmdDistrib.blockUntilFinished();
         }
       }
     }
@@ -2012,8 +1996,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       // create a merged copy of the metadata from all wrapped exceptions
       NamedList<String> metadata = new NamedList<String>();
       for (Error error : errors) {
-        if (error.e instanceof SolrException) {
-          SolrException e = (SolrException) error.e;
+        if (error.t instanceof SolrException) {
+          SolrException e = (SolrException) error.t;
           NamedList<String> eMeta = e.getMetadata();
           if (null != eMeta) {
             metadata.addAll(eMeta);
@@ -2054,12 +2038,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       assert 0 < errors.size();
       
       if (1 == errors.size()) {
-        return "Async exception during distributed update: " + errors.get(0).e.getMessage();
+        return "Async exception during distributed update: " + errors.get(0).t.getMessage();
       } else {
         StringBuilder buf = new StringBuilder(errors.size() + " Async exceptions during distributed update: ");
         for (Error error : errors) {
           buf.append("\n");
-          buf.append(error.e.getMessage());
+          buf.append(error.t.getMessage());
         }
         return buf.toString();
       }
@@ -2146,6 +2130,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // we have a replication factor of one by default.
     private int achievedRf = 1;
     private final int requestedRf;
+    private Set<String> nodes = new HashSet<>();
 
     private final String myShardId;
 
@@ -2170,7 +2155,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       }
 
       if (success) {
-        ++achievedRf;
+        synchronized (this) {
+          if (nodes.add(node.getUrl())) ++achievedRf;
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
index 1d2d730..a4e9a02 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
@@ -336,7 +336,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
     final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
     cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
-    cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly?  It doesn't.
+    cmdDistrib.blockUntilFinished(); //TODO shouldn't distribCommit do this implicitly?  It doesn't.
   }
 
 // Not supported by SolrCmdDistributor and is sketchy any way
@@ -359,11 +359,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
 
   @Override
   protected void doClose() {
-    try {
-      cmdDistrib.close();
-    } finally {
-      super.doClose();
-    }
+    super.doClose();
   }
 
   private SolrCmdDistributor.Node routeDocToSlice(String collection, SolrInputDocument doc) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
index 2f4de12..aa189fc 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java
@@ -247,11 +247,11 @@ public class TolerantUpdateProcessor extends UpdateRequestProcessor {
         //
         // instead we trust the metadata that the TolerantUpdateProcessor running on the remote node added
         // to the exception when it failed.
-        if ( ! (error.e instanceof SolrException) ) {
-          log.error("async update exception is not SolrException, no metadata to process", error.e);
+        if ( ! (error.t instanceof SolrException) ) {
+          log.error("async update exception is not SolrException, no metadata to process", error.t);
           continue;
         }
-        SolrException remoteErr = (SolrException) error.e;
+        SolrException remoteErr = (SolrException) error.t;
         NamedList<String> remoteErrMetadata = remoteErr.getMetadata();
 
         if (null == remoteErrMetadata) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
deleted file mode 100644
index 72d39ff..0000000
--- a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.update;
-
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.common.util.NamedList;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketException;
-
-public class MockStreamingSolrClients extends StreamingSolrClients {
-  
-  public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION};
-  
-  private volatile Exp exp = null;
-  
-  public MockStreamingSolrClients(UpdateShardHandler updateShardHandler) {
-    super(updateShardHandler);
-  }
-  
-  @Override
-  public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) {
-    SolrClient client = super.getSolrClient(req);
-    return new MockSolrClient(client);
-  }
-  
-  public void setExp(Exp exp) {
-    this.exp = exp;
-  }
-
-  private IOException exception() {
-    switch (exp) {
-      case CONNECT_EXCEPTION:
-        return new ConnectException();
-      case SOCKET_EXCEPTION:
-        return new SocketException();
-      default:
-        break;
-    }
-    return null;
-  }
-
-  class MockSolrClient extends SolrClient {
-
-    private SolrClient solrClient;
-
-    public MockSolrClient(SolrClient solrClient) {
-      this.solrClient = solrClient;
-    }
-    
-    @Override
-    public NamedList<Object> request(SolrRequest request, String collection)
-        throws SolrServerException, IOException {
-      if (exp != null) {
-        if (LuceneTestCase.random().nextBoolean()) {
-          throw exception();
-        } else {
-          throw new SolrServerException(exception());
-        }
-      }
-      
-      return solrClient.request(request);
-    }
-
-    @Override
-    public void close() {}
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
new file mode 100644
index 0000000..b843709
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.common.util.NamedList;
+
+public class MockingHttp2SolrClient extends Http2SolrClient {
+
+  public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION};
+
+  private volatile Exp exp = null;
+
+  public MockingHttp2SolrClient(String baseSolrUrl, Builder builder) {
+    super(baseSolrUrl, builder);
+  }
+
+  public static class Builder extends Http2SolrClient.Builder {
+
+    public Builder(String baseSolrUrl, UpdateShardHandlerConfig config) {
+      super(baseSolrUrl);
+      this.connectionTimeout(config.getDistributedConnectionTimeout());
+      this.idleTimeout(config.getDistributedSocketTimeout());
+    }
+
+    public MockingHttp2SolrClient build() {
+      return new MockingHttp2SolrClient(baseSolrUrl, this);
+    }
+  }
+
+
+  public void setExp(Exp exp) {
+    this.exp = exp;
+  }
+
+  private IOException exception() {
+    switch (exp) {
+      case CONNECT_EXCEPTION:
+        return new ConnectException();
+      case SOCKET_EXCEPTION:
+        return new SocketException();
+      default:
+        break;
+    }
+    return null;
+  }
+
+  @Override
+  public NamedList<Object> request(SolrRequest request, String collection)
+      throws SolrServerException, IOException {
+    if (exp != null) {
+      if (LuceneTestCase.random().nextBoolean()) {
+        throw exception();
+      } else {
+        throw new SolrServerException(exception());
+      }
+    }
+
+    return super.request(request);
+  }
+
+  public Http2ClientResponse request(SolrRequest solrRequest, String collection, OnComplete onComplete)
+      throws SolrServerException, IOException {
+    if (exp != null) {
+      if (LuceneTestCase.random().nextBoolean()) {
+        throw exception();
+      } else {
+        throw new SolrServerException(exception());
+      }
+    }
+    return super.request(solrRequest, collection, onComplete);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
index 1699b0d..551aed5 100644
--- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
@@ -45,7 +45,6 @@ import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrEventListener;
 import org.apache.solr.index.LogDocMergePolicyFactory;
 import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.update.MockStreamingSolrClients.Exp;
 import org.apache.solr.update.SolrCmdDistributor.Error;
 import org.apache.solr.update.SolrCmdDistributor.Node;
 import org.apache.solr.update.SolrCmdDistributor.RetryNode;
@@ -58,9 +57,9 @@ import org.xml.sax.SAXException;
 
 // See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally
 public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
-  
+
   private AtomicInteger id = new AtomicInteger();
-  
+
   @BeforeClass
   public static void beforeClass() throws Exception {
     // we can't use the Randomized merge policy because the test depends on
@@ -74,22 +73,21 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
   }
 
   private UpdateShardHandler updateShardHandler;
-  
+
   public SolrCmdDistributorTest() throws ParserConfigurationException, IOException, SAXException {
     updateShardHandler = new UpdateShardHandler(UpdateShardHandlerConfig.DEFAULT);
-    
     stress = 0;
   }
 
   public static String getSchemaFile() {
     return "schema.xml";
   }
-  
+
   public static  String getSolrConfigFile() {
     // use this because it has /update and is minimal
     return "solrconfig-tlog.xml";
   }
-  
+
   // TODO: for now we redefine this method so that it pulls from the above
   // we don't get helpful override behavior due to the method being static
   @Override
@@ -141,8 +139,8 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     HttpSolrClient client;
     ZkNodeProps nodeProps;
 
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
-
+    {
+      SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler);
       nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
           ((HttpSolrClient) controlClient).getBaseURL(),
           ZkStateReader.CORE_NAME_PROP, "");
@@ -175,7 +173,8 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     }
     int id2;
     // add another 2 docs to control and 3 to client
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
+    {
+      SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler);
       cmd.solrDoc = sdoc("id", id.incrementAndGet());
       params = new ModifiableSolrParams();
       params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@@ -218,8 +217,8 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     dcmd.id = Integer.toString(id2);
 
 
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
-
+    {
+      SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler);
       params = new ModifiableSolrParams();
       params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
 
@@ -250,8 +249,8 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       //System.out.println(clients.get(0).request(new LukeRequest()));
     }
 
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
-
+    {
+      SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler);
       int cnt = atLeast(303);
       for (int i = 0; i < cnt; i++) {
         nodes.clear();
@@ -324,19 +323,20 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
             ((NamedList<Object>) resp.get("index")).get("maxDoc"));
       }
     }
-    
+
     testMaxRetries();
     testOneRetry();
     testRetryNodeAgainstBadAddress();
     testRetryNodeWontRetrySocketError();
-    
+
     testDistribOpenSearcher();
   }
 
   private void testMaxRetries() throws IOException {
-    final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) {
-      streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+    final MockingHttp2SolrClient.Builder clientBuilder = new MockingHttp2SolrClient.Builder("", UpdateShardHandlerConfig.DEFAULT);
+    try (MockingHttp2SolrClient solrClient = clientBuilder.build()) {
+      SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(solrClient, 5);
+      solrClient.setExp(MockingHttp2SolrClient.Exp.CONNECT_EXCEPTION);
       ArrayList<Node> nodes = new ArrayList<>();
       final HttpSolrClient solrclient1 = (HttpSolrClient) clients.get(0);
 
@@ -364,14 +364,15 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       assertEquals(1, cmdDistrib.getErrors().size());
     }
   }
-  
+
   private void testOneRetry() throws Exception {
     final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
     long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
         .getNumFound();
-    final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) {
-      streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+    final MockingHttp2SolrClient.Builder clientBuilder = new MockingHttp2SolrClient.Builder("", UpdateShardHandlerConfig.DEFAULT);
+    try (MockingHttp2SolrClient solrClient = clientBuilder.build()) {
+      SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(solrClient, 5);
+      solrClient.setExp(MockingHttp2SolrClient.Exp.CONNECT_EXCEPTION);
       ArrayList<Node> nodes = new ArrayList<>();
 
       ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
@@ -382,7 +383,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
         @Override
         public boolean checkRetry() {
-          streamingClients.setExp(null);
+          solrClient.setExp(null);
           retries.incrementAndGet();
           return true;
         }
@@ -416,9 +417,10 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
     long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
         .getNumFound();
-    final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) {
-      streamingClients.setExp(Exp.SOCKET_EXCEPTION);
+    final MockingHttp2SolrClient.Builder clientBuilder = new MockingHttp2SolrClient.Builder("", UpdateShardHandlerConfig.DEFAULT);
+    try (MockingHttp2SolrClient solrClient = clientBuilder.build()) {
+      SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(solrClient, 5);
+      solrClient.setExp(MockingHttp2SolrClient.Exp.SOCKET_EXCEPTION);
       ArrayList<Node> nodes = new ArrayList<>();
 
       ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
@@ -444,7 +446,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
       cmdDistrib.distribAdd(cmd, nodes, params);
 
-      streamingClients.setExp(null);
+      solrClient.setExp(null);
       cmdDistrib.distribCommit(ccmd, nodes, params);
       cmdDistrib.finish();
 
@@ -463,7 +465,8 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
 
   private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException {
     // Test RetryNode
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
+    {
+      SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler);
       final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
       long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
           .getNumFound();
@@ -513,7 +516,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       }
     }
   }
-  
+
   @Override
   public void distribTearDown() throws Exception {
     updateShardHandler.close();
@@ -521,7 +524,8 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
   }
 
   private void testDistribOpenSearcher() {
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
+    {
+      SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler);
       UpdateRequest updateRequest = new UpdateRequest();
 
       CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java
index 7dbaab9..e819384 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java
@@ -77,6 +77,8 @@ public abstract class SolrRequest<T extends SolrResponse> implements Serializabl
 
   private String basicAuthUser, basicAuthPwd;
 
+  private String basePath;
+
   public SolrRequest setBasicAuthCredentials(String user, String password) {
     this.basicAuthUser = user;
     this.basicAuthPwd = password;
@@ -215,4 +217,13 @@ public abstract class SolrRequest<T extends SolrResponse> implements Serializabl
     return getParams() == null ? null : getParams().get("collection");
   }
 
+  public void setBasePath(String path) {
+    if (path.endsWith("/")) path = path.substring(0, path.length() - 1);
+
+    this.basePath = path;
+  }
+
+  public String getBasePath() {
+    return basePath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 7db4d8c..ebcde3e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -94,6 +94,39 @@ import static org.apache.solr.common.util.Utils.getObjectByPath;
 public class Http2SolrClient extends SolrClient {
   private static volatile SslContextFactory defaultSslContextFactory;
 
+  static {
+    defaultSslContextFactory = new SslContextFactory(false);
+
+    if (null != System.getProperty("javax.net.ssl.keyStore")) {
+      defaultSslContextFactory.setKeyStorePath
+          (System.getProperty("javax.net.ssl.keyStore"));
+    }
+    if (null != System.getProperty("javax.net.ssl.keyStorePassword")) {
+      defaultSslContextFactory.setKeyStorePassword
+          (System.getProperty("javax.net.ssl.keyStorePassword"));
+    }
+    if (null != System.getProperty("javax.net.ssl.trustStore")) {
+      defaultSslContextFactory.setTrustStorePath
+          (System.getProperty("javax.net.ssl.trustStore"));
+    }
+    if (null != System.getProperty("javax.net.ssl.trustStorePassword")) {
+      defaultSslContextFactory.setTrustStorePassword
+          (System.getProperty("javax.net.ssl.trustStorePassword"));
+    }
+
+    String checkPeerNameStr = System.getProperty(HttpClientUtil.SYS_PROP_CHECK_PEER_NAME);
+    boolean sslCheckPeerName = true;
+    if (checkPeerNameStr == null && "false".equalsIgnoreCase(checkPeerNameStr)) {
+      sslCheckPeerName = false;
+    }
+
+    if (System.getProperty("tests.jettySsl.clientAuth") != null) {
+      sslCheckPeerName = sslCheckPeerName || Boolean.getBoolean("tests.jettySsl.clientAuth");
+    }
+    defaultSslContextFactory.setNeedClientAuth(sslCheckPeerName);
+
+  }
+
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final int MAX_OUTSTANDING_REQUESTS = 1000;
   private static final String AGENT = "Solr[" + Http2SolrClient.class.getName() + "] 2.0";
@@ -147,7 +180,7 @@ public class Http2SolrClient extends SolrClient {
 
   private boolean closeClient;
 
-  private Http2SolrClient(String serverBaseUrl, Builder builder) {
+  protected Http2SolrClient(String serverBaseUrl, Builder builder) {
     // TODO: what about shared instances?
     available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false);
 
@@ -237,7 +270,7 @@ public class Http2SolrClient extends SolrClient {
     }
   }
 
-  private Http2ClientResponse request(SolrRequest solrRequest, String collection, OnComplete onComplete)
+  public Http2ClientResponse request(SolrRequest solrRequest, String collection, OnComplete onComplete)
       throws SolrServerException, IOException {
     return request(solrRequest, collection, onComplete, false);
   }
@@ -296,14 +329,14 @@ public class Http2SolrClient extends SolrClient {
         }
         return arsp;
       }
-    } catch(InterruptedException e) {
+    } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
-    } catch(TimeoutException e) {
+    } catch (TimeoutException e) {
       throw new SolrServerException(
           "Timeout occured while waiting response from server at: "
               + getBaseURL(), e);
-    } catch(ExecutionException e) {
+    } catch (ExecutionException e) {
       Throwable cause = e.getCause();
       if (cause instanceof ConnectException) {
         throw new SolrServerException("Server refused connection at: "
@@ -348,7 +381,7 @@ public class Http2SolrClient extends SolrClient {
 
     //TODO add invariantParams support
 
-    String basePath = serverBaseUrl;
+    String basePath = solrRequest.getBasePath() == null ? serverBaseUrl : solrRequest.getBasePath();
     if (collection != null)
       basePath += "/" + collection;
 
@@ -525,12 +558,13 @@ public class Http2SolrClient extends SolrClient {
         NamedList err = (NamedList) rsp.get("error");
         if (err != null) {
           reason = (String) err.get("msg");
-          if(reason == null) {
+          if (reason == null) {
             reason = (String) err.get("trace");
           }
-          metadata = (NamedList<String>)err.get("metadata");
+          metadata = (NamedList<String>) err.get("metadata");
         }
-      } catch (Exception ex) {}
+      } catch (Exception ex) {
+      }
       if (reason == null) {
         StringBuilder msg = new StringBuilder();
         msg.append(response.getReason())
@@ -539,7 +573,8 @@ public class Http2SolrClient extends SolrClient {
             .append(response.getRequest().getMethod());
         try {
           reason = java.net.URLDecoder.decode(msg.toString(), UTF_8);
-        } catch (UnsupportedEncodingException e) {}
+        } catch (UnsupportedEncodingException e) {
+        }
       }
       RemoteSolrException rss = new RemoteSolrException(serverBaseUrl, httpStatus, reason, null);
       if (metadata != null) rss.setMetadata(metadata);
@@ -622,7 +657,7 @@ public class Http2SolrClient extends SolrClient {
     private Integer idleTimeout;
     private Integer connectionTimeout;
     private boolean useHttp1_1 = false;
-    private String baseSolrUrl;
+    protected String baseSolrUrl;
 
     public Builder(String baseSolrUrl) {
       this.baseSolrUrl = baseSolrUrl;
@@ -665,14 +700,10 @@ public class Http2SolrClient extends SolrClient {
    */
   public static class RemoteSolrException extends SolrException {
     /**
-     * @param remoteHost
-     *          the host the error was received from
-     * @param code
-     *          Arbitrary HTTP status code
-     * @param msg
-     *          Exception Message
-     * @param th
-     *          Throwable to wrap with this Exception
+     * @param remoteHost the host the error was received from
+     * @param code       Arbitrary HTTP status code
+     * @param msg        Exception Message
+     * @param th         Throwable to wrap with this Exception
      */
     public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
       super(code, "Error from server at " + remoteHost + ": " + msg, th);
@@ -709,7 +740,7 @@ public class Http2SolrClient extends SolrClient {
     }
   }
 
-  private static class Http2ClientResponse {
+  protected static class Http2ClientResponse {
     NamedList response;
     InputStream stream;
   }
@@ -720,9 +751,10 @@ public class Http2SolrClient extends SolrClient {
 
   /**
    * Expert Method
+   *
    * @param queryParams set of param keys to only send via the query string
-   * Note that the param will be sent as a query string if the key is part
-   * of this Set or the SolrRequest's query params.
+   *                    Note that the param will be sent as a query string if the key is part
+   *                    of this Set or the SolrRequest's query params.
    * @see org.apache.solr.client.solrj.SolrRequest#getQueryParams
    */
   public void setQueryParams(Set<String> queryParams) {
@@ -734,7 +766,7 @@ public class Http2SolrClient extends SolrClient {
     ModifiableSolrParams queryModParams = new ModifiableSolrParams();
     if (queryParamNames != null) {
       for (String param : queryParamNames) {
-        String[] value = wparams.getParams(param) ;
+        String[] value = wparams.getParams(param);
         if (value != null) {
           for (String v : value) {
             queryModParams.add(param, v);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
index 1dd67ca..22c84b1 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
@@ -27,13 +27,13 @@ import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.http.ParseException;
 import org.apache.solr.SolrJettyTestBase;
-import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -100,7 +100,7 @@ public class Http2SolrClientTest extends SolrJettyTestBase {
       headers = new HashMap<>();
       while (headerNames.hasMoreElements()) {
         final String name = headerNames.nextElement();
-        headers.put(name.toLowerCase(), req.getHeader(name));
+        headers.put(name.toLowerCase(Locale.getDefault()), req.getHeader(name));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
index 4a67581..4bb9a29 100644
--- a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
@@ -49,7 +49,6 @@ import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettyConfig;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
@@ -439,8 +438,12 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
   protected SolrClient createNewSolrClient(int port) {
     try {
       // setup the client...
-      HttpSolrClient client = getHttpSolrClient(buildUrl(port) + "/" + DEFAULT_TEST_CORENAME);
-      return client;
+      String baseUrl = buildUrl(port);
+      if (baseUrl.endsWith("/")) {
+        return getHttpSolrClient(baseUrl + DEFAULT_TEST_CORENAME);
+      } else {
+        return getHttpSolrClient(baseUrl + "/" + DEFAULT_TEST_CORENAME);
+      }
     }
     catch (Exception ex) {
       throw new RuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 061bf8c..f76432a 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -287,7 +287,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
     sslConfig = buildSSLConfig();
     // based on randomized SSL config, set SchemaRegistryProvider appropriately
     HttpClientUtil.setSchemaRegistryProvider(sslConfig.buildClientSchemaRegistryProvider());
-    Http2SolrClient.setSslContextFactory(SSLConfig.createContextFactory(sslConfig));
+    Http2SolrClient.setSslContextFactory(sslConfig.buildSslContextFactory());
     if(isSSLMode()) {
       // SolrCloud tests should usually clear this
       System.setProperty("urlScheme", "https");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a045d16/solr/test-framework/src/java/org/apache/solr/util/SSLTestConfig.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/util/SSLTestConfig.java b/solr/test-framework/src/java/org/apache/solr/util/SSLTestConfig.java
index 3b03f6e..7d3f69b 100644
--- a/solr/test-framework/src/java/org/apache/solr/util/SSLTestConfig.java
+++ b/solr/test-framework/src/java/org/apache/solr/util/SSLTestConfig.java
@@ -147,6 +147,21 @@ public class SSLTestConfig extends SSLConfig {
       return HTTP_ONLY_SCHEMA_PROVIDER;
     }
   }
+
+  public SslContextFactory buildSslContextFactory() {
+    if (!isSSLMode()) {
+      return null;
+    }
+    SslContextFactory sslContextFactory = new SslContextFactory(false);
+    try {
+      SSLContext sslContext = buildClientSSLContext();
+      sslContextFactory.setSslContext(sslContext);
+      sslContextFactory.setNeedClientAuth(checkPeerName);
+    } catch (KeyManagementException | UnrecoverableKeyException | NoSuchAlgorithmException | KeyStoreException e) {
+      throw new IllegalStateException("Unable to setup https scheme for HTTPClient to test SSL.", e);
+    }
+    return sslContextFactory;
+  }
   
   /**
    * Builds a new SSLContext for HTTP <b>clients</b> to use when communicating with servers which have 


Mime
View raw message