lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From thelabd...@apache.org
Subject svn commit: r1610856 - in /lucene/dev/trunk/solr: CHANGES.txt solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServerTest.java
Date Tue, 15 Jul 2014 21:29:01 GMT
Author: thelabdude
Date: Tue Jul 15 21:29:01 2014
New Revision: 1610856

URL: http://svn.apache.org/r1610856
Log:
SOLR-6136: Fix spin lock in blockUntilFinished (which eats CPU unnecessarily) using wait/notify
and add a unit test for ConcurrentUpdateSolrServer

Added:
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServerTest.java
  (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1610856&r1=1610855&r2=1610856&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Jul 15 21:29:01 2014
@@ -184,6 +184,8 @@ Bug Fixes
     schema swap-out process
   (Gregory Chanan via Steve Rowe)
 
+* SOLR-6136: ConcurrentUpdateSolrServer includes a Spin Lock (Brandon Chapman, Timothy Potter)
+
 Optimizations
 ---------------------
 

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java?rev=1610856&r1=1610855&r2=1610856&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
(original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
Tue Jul 15 21:29:01 2014
@@ -153,15 +153,15 @@ public class ConcurrentUpdateSolrServer 
 
       log.debug("starting runner: {}", this);
       HttpPost method = null;
-      HttpResponse response = null;
+      HttpResponse response = null;            
       try {
         while (!queue.isEmpty()) {
           try {
-            final UpdateRequest updateRequest = queue.poll(250,
-                TimeUnit.MILLISECONDS);
+            final UpdateRequest updateRequest = 
+                queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
             if (updateRequest == null)
               break;
-
+                       
             String contentType = server.requestWriter.getUpdateContentType();
             final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
 
@@ -174,9 +174,9 @@ public class ConcurrentUpdateSolrServer 
                 try {
                   if (isXml) {
                     out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can
be anything
-                  }
+                  }                                    
                   UpdateRequest req = updateRequest;
-                  while (req != null) {
+                  while (req != null) {                                        
                     SolrParams currentParams = new ModifiableSolrParams(req.getParams());
                     if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
                       queue.add(req); // params are different, push back to queue
@@ -206,6 +206,7 @@ public class ConcurrentUpdateSolrServer 
                     out.flush();
                     req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
                   }
+                  
                   if (isXml) {
                     out.write("</stream>".getBytes(StandardCharsets.UTF_8));
                   }
@@ -228,15 +229,13 @@ public class ConcurrentUpdateSolrServer 
             method.setEntity(template);
             method.addHeader("User-Agent", HttpSolrServer.AGENT);
             method.addHeader("Content-Type", contentType);
-            
-            
+                        
             response = server.getHttpClient().execute(method);
             int statusCode = response.getStatusLine().getStatusCode();
             if (statusCode != HttpStatus.SC_OK) {
               StringBuilder msg = new StringBuilder();
               msg.append(response.getStatusLine().getReasonPhrase());
-              msg.append("\n\n");
-              msg.append("\n\n");
+              msg.append("\n\n\n\n");
               msg.append("request: ").append(method.getURI());
               handleError(new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString()));
             } else {
@@ -264,6 +263,8 @@ public class ConcurrentUpdateSolrServer 
             scheduler.execute(this);
           } else {
             runners.remove(this);
+            if (runners.isEmpty())
+              runners.notifyAll();
           }
         }
 
@@ -319,17 +320,11 @@ public class ConcurrentUpdateSolrServer 
 
       for (;;) {
         synchronized (runners) {
-          if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() // queue
-                                                                             // is
-                                                                             // half
-                                                                             // full
-                                                                             // and
-                                                                             // we
-                                                                             // can
-                                                                             // add
-                                                                             // more
-                                                                             // runners
-              && runners.size() < threadCount)) {
+          // see if queue is half full and we can add more runners
+          // special case: if only using a threadCount of 1 and the queue
+          // is filling up, allow 1 add'l runner to help process the queue
+          if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() &&
runners.size() < threadCount))
+          {
             // We need more runners, so start a new one.
             Runner r = new Runner();
             runners.add(r);
@@ -358,9 +353,7 @@ public class ConcurrentUpdateSolrServer 
         if (!success) {
           success = queue.offer(req, 100, TimeUnit.MILLISECONDS);
         }
-
       }
-
     } catch (InterruptedException e) {
       log.error("interrupted", e);
       throw new IOException(e.getLocalizedMessage());
@@ -375,27 +368,27 @@ public class ConcurrentUpdateSolrServer 
   public synchronized void blockUntilFinished() {
     lock = new CountDownLatch(1);
     try {
-      // Wait until no runners are running
-      for (;;) {
-        Runner runner;
-        synchronized (runners) {
-          runner = runners.peek();
-        }
-
-        if ((runner == null && queue.isEmpty()) || scheduler.isTerminated())
-          break;
-        
-        if (runner != null) {
-          runner.runnerLock.lock();
-          runner.runnerLock.unlock();
-        } else if (!queue.isEmpty()) {
-          // failsafe - should not be necessary, but a good
-          // precaution to ensure blockUntilFinished guarantees
-          // all updates are emptied from the queue regardless of
-          // any bugs around starting or retaining runners
-          Runner r = new Runner();
-          runners.add(r);
-          scheduler.execute(r);
+      synchronized (runners) {
+        while (!runners.isEmpty()) {
+          try {
+            runners.wait();
+          } catch (InterruptedException e) {
+            Thread.interrupted();
+          }
+          
+          if (scheduler.isTerminated())
+            break;
+                      
+          // if we reach here, then we probably got the notifyAll, but need to check if
+          // the queue is empty before really considering this is finished (SOLR-4260)
+          int queueSize = queue.size();
+          if (queueSize > 0) {
+            log.warn("No more runners, but queue still has "+
+              queueSize+" adding more runners to process remaining requests on queue");
+            Runner r = new Runner();
+            runners.add(r);
+            scheduler.execute(r);
+          }
         }
       }
     } finally {
@@ -450,15 +443,15 @@ public class ConcurrentUpdateSolrServer 
     if (shutdownExecutor) {
       scheduler.shutdownNow(); // Cancel currently executing tasks
       try {
-        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) log
-            .error("ExecutorService did not terminate");
+        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) 
+          log.error("ExecutorService did not terminate");
       } catch (InterruptedException ie) {
         scheduler.shutdownNow();
         Thread.currentThread().interrupt();
       }
-    }
+    }    
   }
-
+  
   public void setParser(ResponseParser responseParser) {
     server.setParser(responseParser);
   }

Added: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServerTest.java?rev=1610856&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServerTest.java
(added)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServerTest.java
Tue Jul 15 21:29:01 2014
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.http.HttpResponse;
+import org.apache.solr.SolrJettyTestBase;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.util.ExternalPaths;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ConcurrentUpdateSolrServerTest extends SolrJettyTestBase {
+
+  /**
+   * Mock endpoint where the CUSS being tested in this class sends requests.
+   */
+  public static class TestServlet extends HttpServlet 
+    implements JavaBinUpdateRequestCodec.StreamingUpdateHandler
+  {   
+    private static final long serialVersionUID = 1L;
+
+    public static void clear() {
+      lastMethod = null;
+      headers = null;
+      parameters = null;
+      errorCode = null;
+      numReqsRcvd.set(0);
+      numDocsRcvd.set(0);
+    }
+    
+    public static Integer errorCode = null;
+    public static String lastMethod = null;
+    public static HashMap<String,String> headers = null;
+    public static Map<String,String[]> parameters = null;
+    public static AtomicInteger numReqsRcvd = new AtomicInteger(0);
+    public static AtomicInteger numDocsRcvd = new AtomicInteger(0);
+    
+    public static void setErrorCode(Integer code) {
+      errorCode = code;
+    }
+        
+    private void setHeaders(HttpServletRequest req) {
+      Enumeration<String> headerNames = req.getHeaderNames();
+      headers = new HashMap<>();
+      while (headerNames.hasMoreElements()) {
+        final String name = headerNames.nextElement();
+        headers.put(name, req.getHeader(name));
+      }
+    }
+
+    private void setParameters(HttpServletRequest req) {
+      parameters = req.getParameterMap();
+    }
+
+    @Override
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+      
+      numReqsRcvd.incrementAndGet();
+      lastMethod = "post";
+      recordRequest(req, resp);
+            
+      InputStream reqIn = req.getInputStream();
+      JavaBinUpdateRequestCodec javabin = new JavaBinUpdateRequestCodec();
+      for (;;) {
+        try {
+          javabin.unmarshal(reqIn, this);
+        } catch (EOFException e) {
+          break; // this is expected
+        }
+      }      
+    }
+    
+    private void recordRequest(HttpServletRequest req, HttpServletResponse resp) {
+      setHeaders(req);
+      setParameters(req);
+      if (null != errorCode) {
+        try { 
+          resp.sendError(errorCode); 
+        } catch (IOException e) {
+          throw new RuntimeException("sendError IO fail in TestServlet", e);
+        }
+      }
+    }
+
+    @Override
+    public void update(SolrInputDocument document, UpdateRequest req, Integer commitWithin,
Boolean override) {
+      numDocsRcvd.incrementAndGet();
+    }
+  } // end TestServlet
+  
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    createJetty(ExternalPaths.EXAMPLE_HOME, null, null);
+    jetty.getDispatchFilter().getServletHandler()
+        .addServletWithMapping(TestServlet.class, "/cuss/*");
+  }
+  
+  @Test
+  public void testConcurrentUpdate() throws Exception {
+    TestServlet.clear();
+    
+    String serverUrl = jetty.getBaseUrl().toString() + "/cuss/foo";
+        
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+    
+    // for tracking callbacks from CUSS
+    final AtomicInteger successCounter = new AtomicInteger(0);
+    final AtomicInteger errorCounter = new AtomicInteger(0);    
+    final StringBuilder errors = new StringBuilder();     
+    
+    @SuppressWarnings("serial")
+    ConcurrentUpdateSolrServer cuss = new ConcurrentUpdateSolrServer(serverUrl, cussQueueSize,
cussThreadCount) {
+      @Override
+      public void handleError(Throwable ex) {
+        errorCounter.incrementAndGet();
+        errors.append(" "+ex);
+      }
+      @Override
+      public void onSuccess(HttpResponse resp) {
+        successCounter.incrementAndGet();
+      }
+    };
+    
+    cuss.setParser(new BinaryResponseParser());
+    cuss.setRequestWriter(new BinaryRequestWriter());
+    cuss.setPollQueueTime(0);
+    
+    // ensure it doesn't block where there's nothing to do yet
+    cuss.blockUntilFinished();
+    
+    int poolSize = 5;
+    ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);
+
+    int numDocs = 100;
+    int numRunnables = 5;
+    for (int r=0; r < numRunnables; r++)
+      threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs, cuss));
+    
+    // ensure all docs are sent
+    threadPool.awaitTermination(5, TimeUnit.SECONDS);
+    threadPool.shutdown();
+    
+    // wait until all requests are processed by CUSS 
+    cuss.blockUntilFinished();
+    cuss.shutdownNow();    
+    
+    assertEquals("post", TestServlet.lastMethod);
+        
+    // expect all requests to be successful
+    int expectedSuccesses = TestServlet.numReqsRcvd.get();
+    assertTrue(expectedSuccesses > 0); // at least one request must have been sent
+    
+    assertTrue("Expected no errors but got "+errorCounter.get()+
+        ", due to: "+errors.toString(), errorCounter.get() == 0);
+    assertTrue("Expected "+expectedSuccesses+" successes, but got "+successCounter.get(),

+        successCounter.get() == expectedSuccesses);
+    
+    int expectedDocs = numDocs * numRunnables;
+    assertTrue("Expected CUSS to send "+expectedDocs+" but got "+TestServlet.numDocsRcvd.get(),

+        TestServlet.numDocsRcvd.get() == expectedDocs);
+  }
+  
+  class SendDocsRunnable implements Runnable {
+    
+    private String id;
+    private int numDocs;
+    private ConcurrentUpdateSolrServer cuss;
+    
+    SendDocsRunnable(String id, int numDocs, ConcurrentUpdateSolrServer cuss) {
+      this.id = id;
+      this.numDocs = numDocs;
+      this.cuss = cuss;
+    }
+
+    @Override
+    public void run() {
+      for (int d=0; d < numDocs; d++) {
+        SolrInputDocument doc = new SolrInputDocument();
+        String docId = id+"_"+d;
+        doc.setField("id", docId);    
+        UpdateRequest req = new UpdateRequest();
+        req.add(doc);        
+        try {
+          cuss.request(req);
+        } catch (Throwable t) {
+          t.printStackTrace();
+        }
+      }      
+    }    
+  }
+}



Mime
View raw message