From commits-return-104437-archive-asf-public=cust-asf.ponee.io@lucene.apache.org Thu Oct 25 09:31:28 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id E6B06180670 for ; Thu, 25 Oct 2018 09:31:27 +0200 (CEST) Received: (qmail 65380 invoked by uid 500); 25 Oct 2018 07:31:26 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 65371 invoked by uid 99); 25 Oct 2018 07:31:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Oct 2018 07:31:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B3BEEE0A35; Thu, 25 Oct 2018 07:31:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: datcm@apache.org To: commits@lucene.apache.org Message-Id: <66fd935834594b24b61b1de14899bcde@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: lucene-solr:jira/http2: Http2SolrClient support async as default Date: Thu, 25 Oct 2018 07:31:26 +0000 (UTC) Repository: lucene-solr Updated Branches: refs/heads/jira/http2 814bf425e -> e3ee220ab Http2SolrClient support async as default Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e3ee220a Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e3ee220a Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e3ee220a Branch: refs/heads/jira/http2 Commit: e3ee220abe23e513752bbb76e35c075536f62d3c Parents: 814bf42 Author: Cao Manh Dat Authored: Thu Oct 25 14:31:18 2018 +0700 Committer: Cao Manh Dat Committed: Thu Oct 25 14:31:18 2018 +0700 ---------------------------------------------------------------------- .../solr/client/solrj/impl/Http2SolrClient.java | 173 +++++++------------ 1 file changed, 67 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3ee220a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java ---------------------------------------------------------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java index 8d73021..54852d5 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java @@ -32,11 +32,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Phaser; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.IOUtils; import org.apache.http.HttpStatus; @@ -212,19 +214,13 @@ public class Http2SolrClient extends SolrClient { assert ObjectReleaseTracker.release(this); } - public void request(SolrRequest solrRequest, String collection, OnComplete onComplete) - throws SolrServerException, IOException { - request(solrRequest, collection, onComplete, false); - } - private boolean isV2ApiRequest(final SolrRequest request) { return request instanceof V2Request || request.getPath().contains("/____v2"); } - private Http2ClientResponse request(SolrRequest solrRequest, + public void request(SolrRequest solrRequest, String collection, - OnComplete onComplete, - boolean returnStream) throws IOException, SolrServerException { + OnComplete onComplete) throws IOException, SolrServerException { Request req = makeRequest(solrRequest, collection); setBasicAuthHeader(solrRequest, req); for (HttpListenerFactory factory : listenerFactory) { @@ -234,96 +230,32 @@ public class Http2SolrClient extends SolrClient { req.onComplete(listener); } - try { - if (onComplete != null) { - req.onRequestQueued(asyncTracker.queuedListener) - .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener() { - - @Override - public void onComplete(Result result) { - if (result.isFailed()) { - onComplete.onFailure(result.getFailure()); - return; - } + req.onRequestQueued(asyncTracker.queuedListener) + // maximum 6MB for buffering response + .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(6*1024*1024) { - // TODO: should we stream this? - try (InputStream ris = getContentAsInputStream()) { - NamedList rsp; - try { - rsp = processErrorsAndResponse(result.getResponse(), - parser, ris, getEncoding(), isV2ApiRequest(solrRequest)); - onComplete.onSuccess(rsp); - } catch (Exception e) { - onComplete.onFailure(e); - } - } catch (IOException e1) { + @Override + public void onComplete(Result result) { + if (result.isFailed()) { + if (onComplete != null) { + onComplete.onFailure(result.getFailure()); + } + } else { + try (InputStream ris = getContentAsInputStream()) { + NamedList rsp; + rsp = processErrorsAndResponse(result.getResponse(), + parser, ris, getEncoding(), isV2ApiRequest(solrRequest)); + if (onComplete != null) + onComplete.onSuccess(rsp); + } catch (Exception e1) { + if (onComplete != null) { onComplete.onFailure(e1); } } - }); - return null; - } else { - Http2ClientResponse arsp = new Http2ClientResponse(); - if (returnStream) { - InputStreamResponseListener listener = new InputStreamResponseListener(); - req.send(listener); - // Wait for the response headers to arrive - listener.get(idleTimeout, TimeUnit.SECONDS); - // TODO: process response - arsp.stream = listener.getInputStream(); - } else { - ContentResponse response = senReqSync(req); - ByteArrayInputStream is = new ByteArrayInputStream(response.getContent()); - arsp.response = processErrorsAndResponse(response, parser, - is, response.getEncoding(), isV2ApiRequest(solrRequest)); } - return arsp; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (TimeoutException e) { - throw new SolrServerException( - "Timeout occured while waiting response from server at: " - + getBaseURL(), e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof ConnectException) { - throw new SolrServerException("Server refused connection at: " - + getBaseURL(), cause); - } - if (cause instanceof SolrServerException) { - throw (SolrServerException) cause; - } else if (cause instanceof IOException) { - throw new SolrServerException( - "IOException occured when talking to server at: " + getBaseURL(), cause); - } - throw new SolrServerException(cause.getMessage(), cause); - } - } + }); - private ContentResponse senReqSync(Request req) throws InterruptedException, TimeoutException, ExecutionException { - // req.send() method will throw exception when response is more than 2MB, - // by passing a responseListener we can overcome the problem, default buffer size is 20MB - FutureResponseListener listener = new FutureResponseListener(req, 20*1024*1024); - req.send(listener); - try { - return listener.get(); - } catch (ExecutionException x) { - // the exception handling is copied from HttpRequest.send() - if (x.getCause() instanceof TimeoutException) - { - TimeoutException t = (TimeoutException) (x.getCause()); - req.abort(t); - throw t; - } - - req.abort(x); - throw x; - } catch (Throwable x) { - req.abort(x); - throw x; - } } private void setBasicAuthHeader(SolrRequest solrRequest, Request req) throws UnsupportedEncodingException { @@ -567,25 +499,59 @@ public class Http2SolrClient extends SolrClient { return rsp; } + private static class SyncOnComplete implements OnComplete { + + CountDownLatch latch = new CountDownLatch(1); + NamedList result; + Throwable t; + + + @Override + public void onSuccess(NamedList result) { + this.result = result; + latch.countDown(); + } + + @Override + public void onFailure(Throwable e) { + this.t = e; + latch.countDown(); + } + + void waitForResult() throws InterruptedException { + latch.await(); + } + } + @Override public NamedList request(SolrRequest request, String collection) throws SolrServerException, IOException { - return request(request, collection, null, false).response; + SyncOnComplete syncOnComplete = new SyncOnComplete(); + request(request, collection, syncOnComplete); + try { + syncOnComplete.waitForResult(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + if (syncOnComplete.result != null) { + return syncOnComplete.result; + } + if (syncOnComplete.t != null) { + if (syncOnComplete.t instanceof IOException) { + throw new SolrServerException( + "IOException occured when talking to server at: " + getBaseURL(), syncOnComplete.t); + } + throw new SolrServerException(syncOnComplete.t.getMessage(), syncOnComplete.t); + } + throw new IllegalStateException("Request did not return an exception or result"); } public void setRequestWriter(RequestWriter requestWriter) { this.requestWriter = requestWriter; } - private InputStream queryAndStreamResponse(String collection, SolrParams params) - throws SolrServerException, IOException { - QueryRequest queryRequest = new QueryRequest(params); - Http2ClientResponse resp = request(queryRequest, collection, null, true); - assert resp.stream != null; - return resp.stream; - } - public interface OnComplete { - void onSuccess(NamedList result); + void onSuccess(NamedList result); void onFailure(Throwable e); } @@ -737,11 +703,6 @@ public class Http2SolrClient extends SolrClient { } } - protected static class Http2ClientResponse { - NamedList response; - InputStream stream; - } - public Set getQueryParams() { return queryParams; }