lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From no...@apache.org
Subject lucene-solr:branch_7x: SOLR-11380: SolrJ must stream docs to server instead of writing to a buffer first
Date Tue, 31 Oct 2017 04:16:40 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 83cac87ab -> 4b316ec13


SOLR-11380: SolrJ must stream docs to server instead of writing to a buffer first


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/4b316ec1
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/4b316ec1
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/4b316ec1

Branch: refs/heads/branch_7x
Commit: 4b316ec1397431525e25f4c7ceba23c636e71209
Parents: 83cac87
Author: Noble Paul <noble@apache.org>
Authored: Tue Oct 31 13:16:31 2017 +1030
Committer: Noble Paul <noble@apache.org>
Committed: Tue Oct 31 14:46:03 2017 +1030

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../handler/BinaryUpdateRequestHandlerTest.java |  11 +-
 .../client/solrj/impl/BinaryRequestWriter.java  |  81 +++-----
 .../solr/client/solrj/impl/HttpSolrClient.java  | 206 +++++++++++--------
 .../client/solrj/request/RequestWriter.java     |  38 +++-
 5 files changed, 192 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4b316ec1/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 6759142..60a44c1 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -96,6 +96,8 @@ Other Changes
 
 * SOLR-11562: Restore Solr logo ASCII-art in startup log by removing unnecessary default
confdir logging (janhoy)
 
+* SOLR-11380: SolrJ must stream docs to server instead of writing to a buffer first (noble)
+
 ==================  7.1.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4b316ec1/solr/core/src/test/org/apache/solr/handler/BinaryUpdateRequestHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/BinaryUpdateRequestHandlerTest.java
b/solr/core/src/test/org/apache/solr/handler/BinaryUpdateRequestHandlerTest.java
index dfece60..0c02109 100644
--- a/solr/core/src/test/org/apache/solr/handler/BinaryUpdateRequestHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/BinaryUpdateRequestHandlerTest.java
@@ -16,11 +16,15 @@
  */
 package org.apache.solr.handler;
 
+import java.io.ByteArrayOutputStream;
+
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
+import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ContentStreamBase;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.handler.loader.ContentStreamLoader;
 import org.apache.solr.request.SolrQueryRequest;
@@ -54,8 +58,11 @@ public class BinaryUpdateRequestHandlerTest extends SolrTestCaseJ4 {
     SolrQueryRequest req = req();
     ContentStreamLoader csl = handler.newLoader(req, p);
 
-    csl.load(req, rsp, brw.getContentStream(ureq), p);
-
+    RequestWriter.ContentWriter cw = brw.getContentWriter(ureq);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    cw.write(baos);
+    ContentStreamBase.ByteArrayStream cs = new ContentStreamBase.ByteArrayStream(baos.toByteArray(),
null, "application/javabin");
+    csl.load(req, rsp, cs, p);
     AddUpdateCommand add = p.addCommands.get(0);
     System.out.println(add.solrDoc);
     assertEquals(false, add.overwrite);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4b316ec1/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
index 310c282..1f32e07 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
@@ -16,17 +16,17 @@
  */
 package org.apache.solr.client.solrj.impl;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.util.ContentStream;
 
-import java.io.*;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 /**
  * A RequestWriter which writes requests in the javabin format
  *
@@ -37,6 +37,32 @@ import java.util.List;
 public class BinaryRequestWriter extends RequestWriter {
 
   @Override
+  public ContentWriter getContentWriter(SolrRequest req) {
+    if (req instanceof UpdateRequest) {
+      UpdateRequest updateRequest = (UpdateRequest) req;
+      if (isNull(updateRequest.getDocuments()) &&
+          isNull(updateRequest.getDeleteByIdMap()) &&
+          isNull(updateRequest.getDeleteQuery())
+          && (updateRequest.getDocIterator() == null)) {
+        return null;
+      }
+      return new ContentWriter() {
+        @Override
+        public void write(OutputStream os) throws IOException {
+          new JavaBinUpdateRequestCodec().marshal(updateRequest, os);
+        }
+
+        @Override
+        public String getContentType() {
+          return "application/javabin";
+        }
+      };
+    } else {
+      return super.getContentWriter(req);
+    }
+  }
+
+  @Override
   public Collection<ContentStream> getContentStreams(SolrRequest req) throws IOException
{
     if (req instanceof UpdateRequest) {
       UpdateRequest updateRequest = (UpdateRequest) req;
@@ -46,13 +72,10 @@ public class BinaryRequestWriter extends RequestWriter {
               && (updateRequest.getDocIterator() == null) ) {
         return null;
       }
-      List<ContentStream> l = new ArrayList<>();
-      l.add(new LazyContentStream(updateRequest));
-      return l;
+      throw new RuntimeException("This Should not happen");
     } else {
       return super.getContentStreams(req);
     }
-
   }
 
 
@@ -62,46 +85,6 @@ public class BinaryRequestWriter extends RequestWriter {
   }
 
   @Override
-  public ContentStream getContentStream(final UpdateRequest request) throws IOException {
-    final BAOS baos = new BAOS();
-    new JavaBinUpdateRequestCodec().marshal(request, baos);
-    
-    return new ContentStream() {
-      @Override
-      public String getName() {
-        return null;
-      }
-
-      @Override
-      public String getSourceInfo() {
-        return "javabin";
-      }
-
-      @Override
-      public String getContentType() {
-        return "application/javabin";
-      }
-
-      @Override
-      public Long getSize() // size if we know it, otherwise null
-      {
-        return new Long(baos.size());
-      }
-
-      @Override
-      public InputStream getStream() {
-        return new ByteArrayInputStream(baos.getbuf(), 0, baos.size());
-      }
-
-      @Override
-      public Reader getReader() {
-        throw new RuntimeException("No reader available . this is a binarystream");
-      }
-    };
-  }
-
-
-  @Override
   public void write(SolrRequest request, OutputStream os) throws IOException {
     if (request instanceof UpdateRequest) {
       UpdateRequest updateRequest = (UpdateRequest) request;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4b316ec1/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index bf849fc..c292534 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.impl;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.lang.invoke.MethodHandles;
 import java.net.ConnectException;
@@ -52,6 +53,7 @@ import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.entity.mime.FormBodyPart;
@@ -327,7 +329,8 @@ public class HttpSolrClient extends SolrClient {
       request = ((V2RequestSupport) request).getV2Request();
     }
     SolrParams params = request.getParams();
-    Collection<ContentStream> streams = requestWriter.getContentStreams(request);
+    RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(request);
+    Collection<ContentStream> streams = contentWriter == null ? requestWriter.getContentStreams(request)
: null;
     String path = requestWriter.getPath(request);
     if (path == null || !path.startsWith("/")) {
       path = DEFAULT_PATH;
@@ -362,7 +365,7 @@ public class HttpSolrClient extends SolrClient {
     }
 
     if (SolrRequest.METHOD.GET == request.getMethod()) {
-      if (streams != null) {
+      if (streams != null || contentWriter != null) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!");
       }
 
@@ -389,65 +392,32 @@ public class HttpSolrClient extends SolrClient {
           || (streams != null && streams.size() > 1)) && !hasNullStreamName;
 
       LinkedList<NameValuePair> postOrPutParams = new LinkedList<>();
-      if (streams == null || isMultipart) {
-        // send server list and request list as query string params
-        ModifiableSolrParams queryParams = calculateQueryParams(this.queryParams, wparams);
-        queryParams.add(calculateQueryParams(request.getQueryParams(), wparams));
-        String fullQueryUrl = url + queryParams.toQueryString();
+
+      if(contentWriter != null) {
+        String fullQueryUrl = url + wparams.toQueryString();
         HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod()
?
             new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl);
-
-        if (!isMultipart) {
-          postOrPut.addHeader("Content-Type",
-              "application/x-www-form-urlencoded; charset=UTF-8");
-        }
-
-        List<FormBodyPart> parts = new LinkedList<>();
-        Iterator<String> iter = wparams.getParameterNamesIterator();
-        while (iter.hasNext()) {
-          String p = iter.next();
-          String[] vals = wparams.getParams(p);
-          if (vals != null) {
-            for (String v : vals) {
-              if (isMultipart) {
-                parts.add(new FormBodyPart(p, new StringBody(v, StandardCharsets.UTF_8)));
-              } else {
-                postOrPutParams.add(new BasicNameValuePair(p, v));
-              }
-            }
-          }
-        }
-
-        // TODO: remove deprecated - first simple attempt failed, see {@link MultipartEntityBuilder}
-        if (isMultipart && streams != null) {
-          for (ContentStream content : streams) {
-            String contentType = content.getContentType();
-            if (contentType == null) {
-              contentType = BinaryResponseParser.BINARY_CONTENT_TYPE; // default
-            }
-            String name = content.getName();
-            if (name == null) {
-              name = "";
-            }
-            parts.add(new FormBodyPart(name,
-                new InputStreamBody(
-                    content.getStream(),
-                    ContentType.parse(contentType),
-                    content.getName())));
+        postOrPut.addHeader("Content-Type",
+            contentWriter.getContentType());
+        postOrPut.setEntity(new BasicHttpEntity(){
+          @Override
+          public boolean isStreaming() {
+            return true;
           }
-        }
 
-        if (parts.size() > 0) {
-          MultipartEntity entity = new MultipartEntity(HttpMultipartMode.STRICT);
-          for (FormBodyPart p : parts) {
-            entity.addPart(p);
+          @Override
+          public void writeTo(OutputStream outstream) throws IOException {
+            contentWriter.write(outstream);
           }
-          postOrPut.setEntity(entity);
-        } else {
-          //not using multipart
-          postOrPut.setEntity(new UrlEncodedFormEntity(postOrPutParams, StandardCharsets.UTF_8));
-        }
+        });
+        return postOrPut;
 
+      } else if (streams == null || isMultipart) {
+        // send server list and request list as query string params
+        ModifiableSolrParams queryParams = calculateQueryParams(this.queryParams, wparams);
+        queryParams.add(calculateQueryParams(request.getQueryParams(), wparams));
+        String fullQueryUrl = url + queryParams.toQueryString();
+        HttpEntityEnclosingRequestBase postOrPut = fillContentStream(request, streams, wparams,
isMultipart, postOrPutParams, fullQueryUrl);
         return postOrPut;
       }
       // It is has one stream, it is the post body, put the params in the URL
@@ -455,49 +425,111 @@ public class HttpSolrClient extends SolrClient {
         String fullQueryUrl = url + wparams.toQueryString();
         HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod()
?
             new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl);
+        fillSingleContentStream(streams, postOrPut);
 
-        // Single stream as body
-        // Using a loop just to get the first one
-        final ContentStream[] contentStream = new ContentStream[1];
-        for (ContentStream content : streams) {
-          contentStream[0] = content;
-          break;
+        return postOrPut;
+      }
+    }
+
+    throw new SolrServerException("Unsupported method: " + request.getMethod());
+
+  }
+
+  private void fillSingleContentStream(Collection<ContentStream> streams, HttpEntityEnclosingRequestBase
postOrPut) throws IOException {
+    // Single stream as body
+    // Using a loop just to get the first one
+    final ContentStream[] contentStream = new ContentStream[1];
+    for (ContentStream content : streams) {
+      contentStream[0] = content;
+      break;
+    }
+    if (contentStream[0] instanceof RequestWriter.LazyContentStream) {
+      Long size = contentStream[0].getSize();
+      postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), size == null
? -1 : size) {
+        @Override
+        public Header getContentType() {
+          return new BasicHeader("Content-Type", contentStream[0].getContentType());
         }
-        if (contentStream[0] instanceof RequestWriter.LazyContentStream) {
-          Long size = contentStream[0].getSize();
-          postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), size ==
null ? -1 : size) {
-            @Override
-            public Header getContentType() {
-              return new BasicHeader("Content-Type", contentStream[0].getContentType());
-            }
 
-            @Override
-            public boolean isRepeatable() {
-              return false;
-            }
+        @Override
+        public boolean isRepeatable() {
+          return false;
+        }
 
-          });
-        } else {
-          Long size = contentStream[0].getSize();
-          postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), size ==
null ? -1 : size) {
-            @Override
-            public Header getContentType() {
-              return new BasicHeader("Content-Type", contentStream[0].getContentType());
-            }
+      });
+    } else {
+      Long size = contentStream[0].getSize();
+      postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), size == null
? -1 : size) {
+        @Override
+        public Header getContentType() {
+          return new BasicHeader("Content-Type", contentStream[0].getContentType());
+        }
 
-            @Override
-            public boolean isRepeatable() {
-              return false;
-            }
-          });
+        @Override
+        public boolean isRepeatable() {
+          return false;
+        }
+      });
+    }
+  }
+
+  private HttpEntityEnclosingRequestBase fillContentStream(SolrRequest request, Collection<ContentStream>
streams, ModifiableSolrParams wparams, boolean isMultipart, LinkedList<NameValuePair>
postOrPutParams, String fullQueryUrl) throws IOException {
+    HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod()
?
+        new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl);
+
+    if (!isMultipart) {
+      postOrPut.addHeader("Content-Type",
+          "application/x-www-form-urlencoded; charset=UTF-8");
+    }
+
+    List<FormBodyPart> parts = new LinkedList<>();
+    Iterator<String> iter = wparams.getParameterNamesIterator();
+    while (iter.hasNext()) {
+      String p = iter.next();
+      String[] vals = wparams.getParams(p);
+      if (vals != null) {
+        for (String v : vals) {
+          if (isMultipart) {
+            parts.add(new FormBodyPart(p, new StringBody(v, StandardCharsets.UTF_8)));
+          } else {
+            postOrPutParams.add(new BasicNameValuePair(p, v));
+          }
         }
-        return postOrPut;
       }
     }
 
-    throw new SolrServerException("Unsupported method: " + request.getMethod());
+    // TODO: remove deprecated - first simple attempt failed, see {@link MultipartEntityBuilder}
+    if (isMultipart && streams != null) {
+      for (ContentStream content : streams) {
+        String contentType = content.getContentType();
+        if (contentType == null) {
+          contentType = BinaryResponseParser.BINARY_CONTENT_TYPE; // default
+        }
+        String name = content.getName();
+        if (name == null) {
+          name = "";
+        }
+        parts.add(new FormBodyPart(name,
+            new InputStreamBody(
+                content.getStream(),
+                ContentType.parse(contentType),
+                content.getName())));
+      }
+    }
 
+    if (parts.size() > 0) {
+      MultipartEntity entity = new MultipartEntity(HttpMultipartMode.STRICT);
+      for (FormBodyPart p : parts) {
+        entity.addPart(p);
+      }
+      postOrPut.setEntity(entity);
+    } else {
+      //not using multipart
+      postOrPut.setEntity(new UrlEncodedFormEntity(postOrPutParams, StandardCharsets.UTF_8));
+    }
+    return postOrPut;
   }
+
   private static final List<String> errPath = Arrays.asList("metadata", "error-class");//Utils.getObjectByPath(err,
false,"metadata/error-class")
 
   protected NamedList<Object> executeMethod(HttpRequestBase method, final ResponseParser
processor, final boolean isV2Api) throws SolrServerException {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4b316ec1/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
index 89c29d6..8cefa0b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
@@ -16,18 +16,23 @@
  */
 package org.apache.solr.client.solrj.request;
 
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.util.ContentStream;
-import org.apache.solr.common.util.ContentStreamBase;
-
-import java.io.*;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.ContentStreamBase;
 
 /**
  * A RequestWriter is used to write requests to Solr.
@@ -40,6 +45,23 @@ import java.nio.charset.StandardCharsets;
 public class RequestWriter {
   public static final Charset UTF_8 = StandardCharsets.UTF_8;
 
+
+  public interface ContentWriter {
+
+    void write(OutputStream os) throws IOException;
+
+    String getContentType();
+  }
+
+  /**
+   * Use this to do a push writing instead of pull. If this method returns null
+   * {@link org.apache.solr.client.solrj.request.RequestWriter#getContentStream(UpdateRequest)}
is
+   * invoked to do a pull write.
+   */
+  public ContentWriter getContentWriter(SolrRequest req) {
+    return null;
+  }
+
   public Collection<ContentStream> getContentStreams(SolrRequest req) throws IOException
{
     if (req instanceof UpdateRequest) {
       UpdateRequest updateRequest = (UpdateRequest) req;


Mime
View raw message