lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yo...@apache.org
Subject svn commit: r1209553 - in /lucene/dev/branches/solrcloud/solr: ./ core/src/java/org/apache/solr/update/ core/src/test/org/apache/solr/search/ solrj/src/java/org/apache/solr/common/util/
Date Fri, 02 Dec 2011 15:59:57 GMT
Author: yonik
Date: Fri Dec  2 15:59:56 2011
New Revision: 1209553

URL: http://svn.apache.org/viewvc?rev=1209553&view=rev
Log:
tests: stress internal buffering + replay code

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
    lucene/dev/branches/solrcloud/solr/testlogging.properties

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1209553&r1=1209552&r2=1209553&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/TransactionLog.java
Fri Dec  2 15:59:56 2011
@@ -486,6 +486,7 @@ class ChannelFastInputStream extends Fas
     super(null);
     this.ch = ch;
     this.chPosition = chPosition;
+    super.readFromStream = chPosition;  // make sure position() method returns the correct
value
   }
 
   @Override

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1209553&r1=1209552&r2=1209553&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
(original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
Fri Dec  2 15:59:56 2011
@@ -31,12 +31,15 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -968,6 +971,344 @@ public class TestRealTimeGet extends Sol
 
   }
 
+  // This points to the live model when state is ACTIVE, but a snapshot of the
+  // past when recovering.
+  volatile ConcurrentHashMap<Integer,DocInfo> visibleModel;
+
+  // This version simulates updates coming from the leader and sometimes being reordered
+  // and tests the ability to buffer updates and apply them later
+  @Test
+  public void testStressRecovery() throws Exception {
+    clearIndex();
+    assertU(commit());
+
+    final int commitPercent = 5 + random.nextInt(10);
+    final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are
soft
+    final int deletePercent = 4+random.nextInt(25);
+    final int deleteByQueryPercent = 0;  // real-time get isn't currently supported with
delete-by-query
+    final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+    int nWriteThreads = 2 + random.nextInt(10);  // fewer write threads to give recovery
thread more of a chance
+
+    final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time...
it should be <= maxWarmingSearchers
+
+        // query variables
+    final int percentRealtimeQuery = 75;
+    final AtomicLong operations = new AtomicLong(1000);  // number of recovery loops to perform
+    int nReadThreads = 2 + random.nextInt(10);  // fewer read threads to give writers more
of a chance
+
+    initModel(ndocs);
+
+    final AtomicInteger numCommitting = new AtomicInteger();
+
+    List<Thread> threads = new ArrayList<Thread>();
+
+
+    final AtomicLong testVersion = new AtomicLong(0);
+
+
+    final UpdateHandler uHandler = h.getCore().getUpdateHandler();
+    final UpdateLog uLog = uHandler.getUpdateLog();
+    final VersionInfo vInfo = uLog.getVersionInfo();
+    final Object stateChangeLock = new Object();
+    this.visibleModel = model;
+    final Semaphore[] writePermissions = new Semaphore[nWriteThreads];
+    for (int i=0; i<nWriteThreads; i++) writePermissions[i] = new Semaphore(Integer.MAX_VALUE,
false);
+
+    final Semaphore readPermission = new Semaphore(Integer.MAX_VALUE, false);
+
+    for (int i=0; i<nWriteThreads; i++) {
+      final int threadNum = i;
+
+      Thread thread = new Thread("WRITER"+i) {
+        Random rand = new Random(random.nextInt());
+        Semaphore writePermission = writePermissions[threadNum];
+
+        @Override
+        public void run() {
+          try {
+          while (operations.get() > 0) {
+            writePermission.acquire();
+
+            int oper = rand.nextInt(10);
+
+            if (oper < commitPercent) {
+              if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+                Map<Integer,DocInfo> newCommittedModel;
+                long version;
+
+                synchronized(TestRealTimeGet.this) {
+                  newCommittedModel = new HashMap<Integer,DocInfo>(model);  // take
a snapshot
+                  version = snapshotCount++;
+                }
+
+                synchronized (stateChangeLock) {
+                  // These commits won't take affect if we are in recovery mode,
+                  // so change the version to -1 so we won't update our model.
+                  if (uLog.getState() != UpdateLog.State.ACTIVE) version = -1;
+                  if (rand.nextInt(100) < softCommitPercent) {
+                    verbose("softCommit start");
+                    assertU(h.commit("softCommit","true"));
+                    verbose("softCommit end");
+                  } else {
+                    verbose("hardCommit start");
+                    assertU(commit());
+                    verbose("hardCommit end");
+                  }
+                }
+
+                synchronized(TestRealTimeGet.this) {
+                  // install this model snapshot only if it's newer than the current one
+                  // install this model only if we are not in recovery mode.
+                  if (version >= committedModelClock) {
+                    if (VERBOSE) {
+                      verbose("installing new committedModel version="+committedModelClock);
+                    }
+                    committedModel = newCommittedModel;
+                    committedModelClock = version;
+                  }
+                }
+              }
+              numCommitting.decrementAndGet();
+              continue;
+            }
+
+
+            int id;
+
+            if (rand.nextBoolean()) {
+              id = rand.nextInt(ndocs);
+            } else {
+              id = lastId;  // reuse the last ID half of the time to force more race conditions
+            }
+
+            // set the lastId before we actually change it sometimes to try and
+            // uncover more race conditions between writing and reading
+            boolean before = rand.nextBoolean();
+            if (before) {
+              lastId = id;
+            }
+
+            DocInfo info = model.get(id);
+
+            long val = info.val;
+            long nextVal = Math.abs(val)+1;
+
+            // the version we set on the update should determine who wins
+            // These versions are not derived from the actual leader update handler hand
hence this
+            // test may need to change depending on how we handle version numbers.
+            long version = testVersion.incrementAndGet();
+
+            // yield after getting the next version to increase the odds of updates happening
out of order
+            if (rand.nextBoolean()) Thread.yield();
+
+              if (oper < commitPercent + deletePercent) {
+                verbose("deleting id",id,"val=",nextVal,"version",version);
+
+                Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version),
SEEN_LEADER,SEEN_LEADER_VAL));
+
+                // TODO: returning versions for these types of updates is redundant
+                // but if we do return, they had better be equal
+                if (returnedVersion != null) {
+                  assertEquals(-version, returnedVersion.longValue());
+                }
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (Math.abs(version) > Math.abs(currInfo.version)) {
+                    model.put(id, new DocInfo(version, -nextVal));
+                  }
+                }
+
+                verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
+              } else if (oper < commitPercent + deletePercent + deleteByQueryPercent)
{
+
+              } else {
+                verbose("adding id", id, "val=", nextVal,"version",version);
+
+                Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id),
field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL));
+                if (returnedVersion != null) {
+                  assertEquals(version, returnedVersion.longValue());
+                }
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (version > currInfo.version) {
+                    model.put(id, new DocInfo(version, nextVal));
+                  }
+                }
+
+                if (VERBOSE) {
+                  verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
+                }
+
+              }
+            // }   // end sync
+
+            if (!before) {
+              lastId = id;
+            }
+          }
+        } catch (Throwable e) {
+          operations.set(-1L);
+          SolrException.log(log, e);
+          fail(e.getMessage());
+        }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (int i=0; i<nReadThreads; i++) {
+      Thread thread = new Thread("READER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.get() > 0) {
+              // throttle reads (don't completely stop)
+              readPermission.tryAcquire(10, TimeUnit.MILLISECONDS);
+
+
+              // bias toward a recently changed doc
+              int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+              // when indexing, we update the index, then the model
+              // so when querying, we should first check the model, and then the index
+
+              boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+              DocInfo info;
+
+              if (realTime) {
+                info = visibleModel.get(id);
+              } else {
+                synchronized(TestRealTimeGet.this) {
+                  info = committedModel.get(id);
+                }
+              }
+
+
+              if  (VERBOSE) {
+                verbose("querying id", id);
+              }
+              SolrQueryRequest sreq;
+              if (realTime) {
+                sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+              } else {
+                sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+              }
+
+              String response = h.query(sreq);
+              Map rsp = (Map)ObjectBuilder.fromJSON(response);
+              List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+              if (doclist.size() == 0) {
+                // there's no info we can get back with a delete, so not much we can check
without further synchronization
+              } else {
+                assertEquals(1, doclist.size());
+                long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+                long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+                if (foundVer < Math.abs(info.version)
+                    || (foundVer == info.version && foundVal != info.val) ) {   
// if the version matches, the val must
+                  verbose("ERROR, id=", id, "found=",response,"model",info);
+                  assertTrue(false);
+                }
+              }
+            }
+          }
+          catch (Throwable e) {
+            operations.set(-1L);
+            SolrException.log(log, e);
+            fail(e.getMessage());
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    int bufferedAddsApplied = 0;
+    do {
+      assertTrue(uLog.getState() == UpdateLog.State.ACTIVE);
+
+      // before we start buffering updates, we want to point
+      // visibleModel away from the live model.
+
+      visibleModel = new ConcurrentHashMap<Integer, DocInfo>(model);
+
+      synchronized (stateChangeLock) {
+        uLog.bufferUpdates();
+      }
+
+      assertTrue(uLog.getState() == UpdateLog.State.BUFFERING);
+
+      // sometimes wait for a second to allow time for writers to write something
+      if (random.nextBoolean()) Thread.sleep(random.nextInt(10)+1);
+
+      Future<UpdateLog.RecoveryInfo> recoveryInfoF = uLog.applyBufferedUpdates();
+      if (recoveryInfoF != null) {
+        UpdateLog.RecoveryInfo recInfo = null;
+
+        int writeThreadNumber = 0;
+        while (recInfo == null) {
+          try {
+            // wait a short period of time for recovery to complete (and to give a chance
for more writers to concurrently add docs)
+            recInfo = recoveryInfoF.get(random.nextInt(100/nWriteThreads), TimeUnit.MILLISECONDS);
+          } catch (TimeoutException e) {
+            // idle one more write thread
+            verbose("Operation",operations.get(),"Draining permits for write thread",writeThreadNumber);
+            writePermissions[writeThreadNumber++].drainPermits();
+            if (writeThreadNumber >= nWriteThreads) {
+              // if we hit the end, back up and give a few write permits
+              writeThreadNumber--;
+              writePermissions[writeThreadNumber].release(random.nextInt(2) + 1);
+            }
+
+            // throttle readers so they don't steal too much CPU from the recovery thread
+            readPermission.drainPermits();
+          }
+        }
+
+        bufferedAddsApplied += recInfo.adds;
+      }
+
+      // put all writers back at full blast
+      for (Semaphore writePerm : writePermissions) {
+        // I don't think semaphores check for overflow, so we need to check mow many remain
+        int neededPermits = Integer.MAX_VALUE - writePerm.availablePermits();
+        if (neededPermits > 0) writePerm.release( neededPermits );
+      }
+
+      // put back readers at full blast and point back to live model
+      visibleModel = model;
+      int neededPermits = Integer.MAX_VALUE - readPermission.availablePermits();
+      if (neededPermits > 0) readPermission.release( neededPermits );
+
+      verbose("ROUND=",operations.get());
+    } while (operations.decrementAndGet() > 0);
+
+    verbose("bufferedAddsApplied=",bufferedAddsApplied);
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+  }
+
+
+
+
+
+
+
 
   // The purpose of this test is to roughly model how solr uses lucene
   IndexReader reader;
@@ -982,7 +1323,7 @@ public class TestRealTimeGet extends Sol
 
     final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time...
it should be <= maxWarmingSearchers
 
-    final AtomicLong operations = new AtomicLong(10000);  // number of query operations to
perform in total - crank up if
+    final AtomicLong operations = new AtomicLong(1000);  // number of query operations to
perform in total - crank up if
     int nReadThreads = 5 + random.nextInt(25);
     final boolean tombstones = random.nextBoolean();
     final boolean syncCommits = random.nextBoolean();

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java?rev=1209553&r1=1209552&r2=1209553&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
(original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
Fri Dec  2 15:59:56 2011
@@ -27,7 +27,7 @@ public class FastInputStream extends Inp
   private final byte[] buf;
   private int pos;
   private int end;
-  private long readFromStream; // number of bytes read from the underlying inputstream
+  protected long readFromStream; // number of bytes read from the underlying inputstream
 
   public FastInputStream(InputStream in) {
   // use default BUFSIZE of BufferedOutputStream so if we wrap that

Modified: lucene/dev/branches/solrcloud/solr/testlogging.properties
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/testlogging.properties?rev=1209553&r1=1209552&r2=1209553&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/testlogging.properties (original)
+++ lucene/dev/branches/solrcloud/solr/testlogging.properties Fri Dec  2 15:59:56 2011
@@ -1,4 +1,4 @@
 handlers=java.util.logging.ConsoleHandler
-.level=INFO
+.level=SEVERE
 java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
 



Mime
View raw message