Author: yonik
Date: Wed Jul 4 16:02:42 2012
New Revision: 1357324
URL: http://svn.apache.org/viewvc?rev=1357324&view=rev
Log:
tests: refactor TestRealTimeGet
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRTGBase.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressLucene.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressRecovery.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressReorder.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressVersions.java (with props)
Modified:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRTGBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRTGBase.java?rev=1357324&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRTGBase.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRTGBase.java Wed Jul 4 16:02:42 2012
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+
+import org.apache.lucene.index.DocsEnum;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiFields;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.update.UpdateLog;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+public class TestRTGBase extends SolrTestCaseJ4 {
+
+ // means we've seen the leader and have version info (i.e. we are a non-leader replica)
+ public static String FROM_LEADER = DistribPhase.FROMLEADER.toString();
+
+ // since we make up fake versions in these tests, we can get messed up by a DBQ with a real version
+ // since Solr can think following updates were reordered.
+ @Override
+ public void clearIndex() {
+ try {
+ deleteByQueryAndGetVersion("*:*", params("_version_", Long.toString(-Long.MAX_VALUE), DISTRIB_UPDATE_PARAM,FROM_LEADER));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected final ConcurrentHashMap<Integer,DocInfo> model = new ConcurrentHashMap<Integer,DocInfo>();
+ protected Map<Integer,DocInfo> committedModel = new HashMap<Integer,DocInfo>();
+ protected long snapshotCount;
+ protected long committedModelClock;
+ protected volatile int lastId;
+ protected final String field = "val_l";
+ protected Object[] syncArr;
+
+ protected Object globalLock = this;
+
+ protected void initModel(int ndocs) {
+ snapshotCount = 0;
+ committedModelClock = 0;
+ lastId = 0;
+
+ syncArr = new Object[ndocs];
+
+ for (int i=0; i<ndocs; i++) {
+ model.put(i, new DocInfo(0, -1L));
+ syncArr[i] = new Object();
+ }
+ committedModel.putAll(model);
+ }
+
+
+ protected static class DocInfo {
+ long version;
+ long val;
+
+ public DocInfo(long version, long val) {
+ this.version = version;
+ this.val = val;
+ }
+
+ public String toString() {
+ return "{version="+version+",val="+val+"}";
+ }
+ }
+
+ protected long badVersion(Random rand, long version) {
+ if (version > 0) {
+ // return a random number not equal to version
+ for (;;) {
+ long badVersion = rand.nextInt();
+ if (badVersion != version && badVersion != 0) return badVersion;
+ }
+ }
+
+ // if the version does not exist, then we can only specify a positive version
+ for (;;) {
+ long badVersion = rand.nextInt() & 0x7fffffff; // mask off sign bit
+ if (badVersion != 0) return badVersion;
+ }
+ }
+
+
+ protected List<Long> getLatestVersions() {
+ List<Long> recentVersions;
+ UpdateLog.RecentUpdates startingRecentUpdates = h.getCore().getUpdateHandler().getUpdateLog().getRecentUpdates();
+ try {
+ recentVersions = startingRecentUpdates.getVersions(100);
+ } finally {
+ startingRecentUpdates.close();
+ }
+ return recentVersions;
+ }
+
+
+
+ protected int getFirstMatch(IndexReader r, Term t) throws IOException {
+ Fields fields = MultiFields.getFields(r);
+ if (fields == null) return -1;
+ Terms terms = fields.terms(t.field());
+ if (terms == null) return -1;
+ BytesRef termBytes = t.bytes();
+ final TermsEnum termsEnum = terms.iterator(null);
+ if (!termsEnum.seekExact(termBytes, false)) {
+ return -1;
+ }
+ DocsEnum docs = termsEnum.docs(MultiFields.getLiveDocs(r), null, false);
+ int id = docs.nextDoc();
+ if (id != DocIdSetIterator.NO_MORE_DOCS) {
+ int next = docs.nextDoc();
+ assertEquals(DocIdSetIterator.NO_MORE_DOCS, next);
+ }
+ return id == DocIdSetIterator.NO_MORE_DOCS ? -1 : id;
+ }
+
+}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1357324&r1=1357323&r2=1357324&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Wed Jul 4 16:02:42 2012
@@ -17,63 +17,33 @@
package org.apache.solr.search;
-import org.apache.lucene.analysis.MockAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FieldType;
-import org.apache.lucene.index.*;
-import org.apache.lucene.search.*;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.noggit.ObjectBuilder;
-import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.update.UpdateHandler;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.TestHarness;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.solr.core.SolrCore.verbose;
+import static org.apache.solr.core.SolrCore.verbose;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
-@Slow
-public class TestRealTimeGet extends SolrTestCaseJ4 {
-
- // means we've seen the leader and have version info (i.e. we are a non-leader replica)
- private static String FROM_LEADER = DistribPhase.FROMLEADER.toString();
+public class TestRealTimeGet extends TestRTGBase {
@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig-tlog.xml","schema15.xml");
}
- // since we make up fake versions in these tests, we can get messed up by a DBQ with a real version
- // since Solr can think following updates were reordered.
- @Override
- public void clearIndex() {
- try {
- deleteByQueryAndGetVersion("*:*", params("_version_", Long.toString(-Long.MAX_VALUE), DISTRIB_UPDATE_PARAM,FROM_LEADER));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
@Test
public void testGetRealtime() throws Exception {
@@ -428,59 +398,6 @@ public class TestRealTimeGet extends Sol
***/
- final ConcurrentHashMap<Integer,DocInfo> model = new ConcurrentHashMap<Integer,DocInfo>();
- Map<Integer,DocInfo> committedModel = new HashMap<Integer,DocInfo>();
- long snapshotCount;
- long committedModelClock;
- volatile int lastId;
- final String field = "val_l";
- Object[] syncArr;
-
- private void initModel(int ndocs) {
- snapshotCount = 0;
- committedModelClock = 0;
- lastId = 0;
-
- syncArr = new Object[ndocs];
-
- for (int i=0; i<ndocs; i++) {
- model.put(i, new DocInfo(0, -1L));
- syncArr[i] = new Object();
- }
- committedModel.putAll(model);
- }
-
-
- static class DocInfo {
- long version;
- long val;
-
- public DocInfo(long version, long val) {
- this.version = version;
- this.val = val;
- }
-
- public String toString() {
- return "{version="+version+",val="+val+"}";
- }
- }
-
- private long badVersion(Random rand, long version) {
- if (version > 0) {
- // return a random number not equal to version
- for (;;) {
- long badVersion = rand.nextInt();
- if (badVersion != version && badVersion != 0) return badVersion;
- }
- }
-
- // if the version does not exist, then we can only specify a positive version
- for (;;) {
- long badVersion = rand.nextInt() & 0x7fffffff; // mask off sign bit
- if (badVersion != 0) return badVersion;
- }
- }
-
@Test
public void testStressGetRealtime() throws Exception {
clearIndex();
@@ -769,1244 +686,4 @@ public class TestRealTimeGet extends Sol
}
- // This version doesn't synchronize on id to tell what update won, but instead uses versions
- @Test
- public void testStressGetRealtimeVersions() throws Exception {
- clearIndex();
- assertU(commit());
-
- final int commitPercent = 5 + random().nextInt(20);
- final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
- final int deletePercent = 4+random().nextInt(25);
- final int deleteByQueryPercent = 1 + random().nextInt(5);
- final int optimisticPercent = 1+random().nextInt(50); // percent change that an update uses optimistic locking
- final int optimisticCorrectPercent = 25+random().nextInt(70); // percent change that a version specified will be correct
- final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
- int nWriteThreads = 5 + random().nextInt(25);
-
- final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
-
- // query variables
- final int percentRealtimeQuery = 75;
- final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
- int nReadThreads = 5 + random().nextInt(25);
-
-
-
- initModel(ndocs);
-
- final AtomicInteger numCommitting = new AtomicInteger();
-
- List<Thread> threads = new ArrayList<Thread>();
-
- for (int i=0; i<nWriteThreads; i++) {
- Thread thread = new Thread("WRITER"+i) {
- Random rand = new Random(random().nextInt());
-
- @Override
- public void run() {
- try {
- while (operations.get() > 0) {
- int oper = rand.nextInt(100);
-
- if (oper < commitPercent) {
- if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,DocInfo> newCommittedModel;
- long version;
-
- synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
- version = snapshotCount++;
- }
-
- if (rand.nextInt(100) < softCommitPercent) {
- verbose("softCommit start");
- assertU(TestHarness.commit("softCommit","true"));
- verbose("softCommit end");
- } else {
- verbose("hardCommit start");
- assertU(commit());
- verbose("hardCommit end");
- }
-
- synchronized(TestRealTimeGet.this) {
- // install this model snapshot only if it's newer than the current one
- if (version >= committedModelClock) {
- if (VERBOSE) {
- verbose("installing new committedModel version="+committedModelClock);
- }
- committedModel = newCommittedModel;
- committedModelClock = version;
- }
- }
- }
- numCommitting.decrementAndGet();
- continue;
- }
-
-
- int id = rand.nextInt(ndocs);
- Object sync = syncArr[id];
-
- // set the lastId before we actually change it sometimes to try and
- // uncover more race conditions between writing and reading
- boolean before = rand.nextBoolean();
- if (before) {
- lastId = id;
- }
-
- // We can't concurrently update the same document and retain our invariants of increasing values
- // since we can't guarantee what order the updates will be executed.
- // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
- //
- // NOTE: versioning means we can now remove the sync and tell what update "won"
- // synchronized (sync) {
- DocInfo info = model.get(id);
-
- long val = info.val;
- long nextVal = Math.abs(val)+1;
-
- if (oper < commitPercent + deletePercent) {
- verbose("deleting id",id,"val=",nextVal);
-
- Long version = deleteAndGetVersion(Integer.toString(id), null);
- assertTrue(version < 0);
-
- // only update model if the version is newer
- synchronized (model) {
- DocInfo currInfo = model.get(id);
- if (Math.abs(version) > Math.abs(currInfo.version)) {
- model.put(id, new DocInfo(version, -nextVal));
- }
- }
-
- verbose("deleting id", id, "val=",nextVal,"DONE");
- } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
- verbose("deleteByQyery id",id,"val=",nextVal);
-
- Long version = deleteByQueryAndGetVersion("id:"+Integer.toString(id), null);
- assertTrue(version < 0);
-
- // only update model if the version is newer
- synchronized (model) {
- DocInfo currInfo = model.get(id);
- if (Math.abs(version) > Math.abs(currInfo.version)) {
- model.put(id, new DocInfo(version, -nextVal));
- }
- }
-
- verbose("deleteByQyery id", id, "val=",nextVal,"DONE");
- } else {
- verbose("adding id", id, "val=", nextVal);
-
- // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
- Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)), null);
- assertTrue(version > 0);
-
- // only update model if the version is newer
- synchronized (model) {
- DocInfo currInfo = model.get(id);
- if (version > currInfo.version) {
- model.put(id, new DocInfo(version, nextVal));
- }
- }
-
- if (VERBOSE) {
- verbose("adding id", id, "val=", nextVal,"DONE");
- }
-
- }
- // } // end sync
-
- if (!before) {
- lastId = id;
- }
- }
- } catch (Throwable e) {
- operations.set(-1L);
- throw new RuntimeException(e);
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (int i=0; i<nReadThreads; i++) {
- Thread thread = new Thread("READER"+i) {
- Random rand = new Random(random().nextInt());
-
- @Override
- public void run() {
- try {
- while (operations.decrementAndGet() >= 0) {
- // bias toward a recently changed doc
- int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
-
- // when indexing, we update the index, then the model
- // so when querying, we should first check the model, and then the index
-
- boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
- DocInfo info;
-
- if (realTime) {
- info = model.get(id);
- } else {
- synchronized(TestRealTimeGet.this) {
- info = committedModel.get(id);
- }
- }
-
- if (VERBOSE) {
- verbose("querying id", id);
- }
- SolrQueryRequest sreq;
- if (realTime) {
- sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
- } else {
- sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
- }
-
- String response = h.query(sreq);
- Map rsp = (Map)ObjectBuilder.fromJSON(response);
- List doclist = (List)(((Map)rsp.get("response")).get("docs"));
- if (doclist.size() == 0) {
- // there's no info we can get back with a delete, so not much we can check without further synchronization
- } else {
- assertEquals(1, doclist.size());
- long foundVal = (Long)(((Map)doclist.get(0)).get(field));
- long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
- if (foundVer < Math.abs(info.version)
- || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
- verbose("ERROR, id=", id, "found=",response,"model",info);
- assertTrue(false);
- }
- }
- }
- } catch (Throwable e) {
- operations.set(-1L);
- throw new RuntimeException(e);
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (Thread thread : threads) {
- thread.start();
- }
-
- for (Thread thread : threads) {
- thread.join();
- }
-
- }
-
- // This version simulates updates coming from the leader and sometimes being reordered
- @Test
- public void testStressReorderVersions() throws Exception {
- clearIndex();
- assertU(commit());
-
- final int commitPercent = 5 + random().nextInt(20);
- final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
- final int deletePercent = 4+random().nextInt(25);
- final int deleteByQueryPercent = 0; // 1+random().nextInt(7);
- final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
- int nWriteThreads = 5 + random().nextInt(25);
-
- final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
-
- // query variables
- final int percentRealtimeQuery = 75;
- final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
- int nReadThreads = 5 + random().nextInt(25);
-
-
- /** // testing
- final int commitPercent = 5;
- final int softCommitPercent = 100; // what percent of the commits are soft
- final int deletePercent = 0;
- final int deleteByQueryPercent = 50;
- final int ndocs = 1;
- int nWriteThreads = 2;
-
- final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
-
- // query variables
- final int percentRealtimeQuery = 101;
- final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
- int nReadThreads = 1;
- **/
-
-
- initModel(ndocs);
-
- final AtomicInteger numCommitting = new AtomicInteger();
-
- List<Thread> threads = new ArrayList<Thread>();
-
-
- final AtomicLong testVersion = new AtomicLong(0);
-
- for (int i=0; i<nWriteThreads; i++) {
- Thread thread = new Thread("WRITER"+i) {
- Random rand = new Random(random().nextInt());
-
- @Override
- public void run() {
- try {
- while (operations.get() > 0) {
- int oper = rand.nextInt(100);
-
- if (oper < commitPercent) {
- if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,DocInfo> newCommittedModel;
- long version;
-
- synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
- version = snapshotCount++;
- }
-
- if (rand.nextInt(100) < softCommitPercent) {
- verbose("softCommit start");
- assertU(TestHarness.commit("softCommit","true"));
- verbose("softCommit end");
- } else {
- verbose("hardCommit start");
- assertU(commit());
- verbose("hardCommit end");
- }
-
- synchronized(TestRealTimeGet.this) {
- // install this model snapshot only if it's newer than the current one
- if (version >= committedModelClock) {
- if (VERBOSE) {
- verbose("installing new committedModel version="+committedModelClock);
- }
- committedModel = newCommittedModel;
- committedModelClock = version;
- }
- }
- }
- numCommitting.decrementAndGet();
- continue;
- }
-
-
- int id;
-
- if (rand.nextBoolean()) {
- id = rand.nextInt(ndocs);
- } else {
- id = lastId; // reuse the last ID half of the time to force more race conditions
- }
-
- // set the lastId before we actually change it sometimes to try and
- // uncover more race conditions between writing and reading
- boolean before = rand.nextBoolean();
- if (before) {
- lastId = id;
- }
-
- DocInfo info = model.get(id);
-
- long val = info.val;
- long nextVal = Math.abs(val)+1;
-
- // the version we set on the update should determine who wins
- // These versions are not derived from the actual leader update handler hand hence this
- // test may need to change depending on how we handle version numbers.
- long version = testVersion.incrementAndGet();
-
- // yield after getting the next version to increase the odds of updates happening out of order
- if (rand.nextBoolean()) Thread.yield();
-
- if (oper < commitPercent + deletePercent) {
- verbose("deleting id",id,"val=",nextVal,"version",version);
-
- Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
-
- // TODO: returning versions for these types of updates is redundant
- // but if we do return, they had better be equal
- if (returnedVersion != null) {
- assertEquals(-version, returnedVersion.longValue());
- }
-
- // only update model if the version is newer
- synchronized (model) {
- DocInfo currInfo = model.get(id);
- if (Math.abs(version) > Math.abs(currInfo.version)) {
- model.put(id, new DocInfo(version, -nextVal));
- }
- }
-
- verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
- } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
-
- verbose("deleteByQuery id",id,"val=",nextVal,"version",version);
-
- Long returnedVersion = deleteByQueryAndGetVersion("id:"+Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
-
- // TODO: returning versions for these types of updates is redundant
- // but if we do return, they had better be equal
- if (returnedVersion != null) {
- assertEquals(-version, returnedVersion.longValue());
- }
-
- // only update model if the version is newer
- synchronized (model) {
- DocInfo currInfo = model.get(id);
- if (Math.abs(version) > Math.abs(currInfo.version)) {
- model.put(id, new DocInfo(version, -nextVal));
- }
- }
-
- verbose("deleteByQuery id", id, "val=",nextVal,"version",version,"DONE");
-
- } else {
- verbose("adding id", id, "val=", nextVal,"version",version);
-
- Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
- if (returnedVersion != null) {
- assertEquals(version, returnedVersion.longValue());
- }
-
- // only update model if the version is newer
- synchronized (model) {
- DocInfo currInfo = model.get(id);
- if (version > currInfo.version) {
- model.put(id, new DocInfo(version, nextVal));
- }
- }
-
- if (VERBOSE) {
- verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
- }
-
- }
- // } // end sync
-
- if (!before) {
- lastId = id;
- }
- }
- } catch (Throwable e) {
- operations.set(-1L);
- log.error("",e);
- throw new RuntimeException(e);
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (int i=0; i<nReadThreads; i++) {
- Thread thread = new Thread("READER"+i) {
- Random rand = new Random(random().nextInt());
-
- @Override
- public void run() {
- try {
- while (operations.decrementAndGet() >= 0) {
- // bias toward a recently changed doc
- int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
-
- // when indexing, we update the index, then the model
- // so when querying, we should first check the model, and then the index
-
- boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
- DocInfo info;
-
- if (realTime) {
- info = model.get(id);
- } else {
- synchronized(TestRealTimeGet.this) {
- info = committedModel.get(id);
- }
- }
-
- if (VERBOSE) {
- verbose("querying id", id);
- }
- SolrQueryRequest sreq;
- if (realTime) {
- sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
- } else {
- sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
- }
-
- String response = h.query(sreq);
- Map rsp = (Map)ObjectBuilder.fromJSON(response);
- List doclist = (List)(((Map)rsp.get("response")).get("docs"));
- if (doclist.size() == 0) {
- // there's no info we can get back with a delete, so not much we can check without further synchronization
- } else {
- assertEquals(1, doclist.size());
- long foundVal = (Long)(((Map)doclist.get(0)).get(field));
- long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
- if (foundVer < Math.abs(info.version)
- || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
- verbose("ERROR, id=", id, "found=",response,"model",info);
- assertTrue(false);
- }
- }
- }
- } catch (Throwable e) {
- operations.set(-1L);
- log.error("",e);
- throw new RuntimeException(e);
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (Thread thread : threads) {
- thread.start();
- }
-
- for (Thread thread : threads) {
- thread.join();
- }
-
- }
-
-
-
-
-
-
-
- // This points to the live model when state is ACTIVE, but a snapshot of the
- // past when recovering.
- volatile ConcurrentHashMap<Integer,DocInfo> visibleModel;
-
- // This version simulates updates coming from the leader and sometimes being reordered
- // and tests the ability to buffer updates and apply them later
- @Test
- public void testStressRecovery() throws Exception {
- assumeFalse("FIXME: This test is horribly slow sometimes on Windows!", Constants.WINDOWS);
- clearIndex();
- assertU(commit());
-
- final int commitPercent = 5 + random().nextInt(10);
- final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
- final int deletePercent = 4+random().nextInt(25);
- final int deleteByQueryPercent = random().nextInt(5); // real-time get isn't currently supported with delete-by-query
- final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
- int nWriteThreads = 2 + random().nextInt(10); // fewer write threads to give recovery thread more of a chance
-
- final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
-
- // query variables
- final int percentRealtimeQuery = 75;
- final int percentGetLatestVersions = random().nextInt(4);
- final AtomicLong operations = new AtomicLong(atLeast(75)); // number of recovery loops to perform
- int nReadThreads = 2 + random().nextInt(10); // fewer read threads to give writers more of a chance
-
- initModel(ndocs);
-
- final AtomicInteger numCommitting = new AtomicInteger();
-
- List<Thread> threads = new ArrayList<Thread>();
-
-
- final AtomicLong testVersion = new AtomicLong(0);
-
-
- final UpdateHandler uHandler = h.getCore().getUpdateHandler();
- final UpdateLog uLog = uHandler.getUpdateLog();
- final VersionInfo vInfo = uLog.getVersionInfo();
- final Object stateChangeLock = new Object();
- this.visibleModel = model;
- final Semaphore[] writePermissions = new Semaphore[nWriteThreads];
- for (int i=0; i<nWriteThreads; i++) writePermissions[i] = new Semaphore(Integer.MAX_VALUE, false);
-
- final Semaphore readPermission = new Semaphore(Integer.MAX_VALUE, false);
-
- for (int i=0; i<nWriteThreads; i++) {
- final int threadNum = i;
-
- Thread thread = new Thread("WRITER"+i) {
- Random rand = new Random(random().nextInt());
- Semaphore writePermission = writePermissions[threadNum];
-
- @Override
- public void run() {
- try {
- while (operations.get() > 0) {
- writePermission.acquire();
-
- int oper = rand.nextInt(10);
-
- if (oper < commitPercent) {
- if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,DocInfo> newCommittedModel;
- long version;
-
- synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
- version = snapshotCount++;
- }
-
- synchronized (stateChangeLock) {
- // These commits won't take affect if we are in recovery mode,
- // so change the version to -1 so we won't update our model.
- if (uLog.getState() != UpdateLog.State.ACTIVE) version = -1;
- if (rand.nextInt(100) < softCommitPercent) {
- verbose("softCommit start");
- assertU(TestHarness.commit("softCommit","true"));
- verbose("softCommit end");
- } else {
- verbose("hardCommit start");
- assertU(commit());
- verbose("hardCommit end");
- }
- }
-
- synchronized(TestRealTimeGet.this) {
- // install this model snapshot only if it's newer than the current one
- // install this model only if we are not in recovery mode.
- if (version >= committedModelClock) {
- if (VERBOSE) {
- verbose("installing new committedModel version="+committedModelClock);
- }
- committedModel = newCommittedModel;
- committedModelClock = version;
- }
- }
- }
- numCommitting.decrementAndGet();
- continue;
- }
-
-
- int id;
-
- if (rand.nextBoolean()) {
- id = rand.nextInt(ndocs);
- } else {
- id = lastId; // reuse the last ID half of the time to force more race conditions
- }
-
- // set the lastId before we actually change it sometimes to try and
- // uncover more race conditions between writing and reading
- boolean before = rand.nextBoolean();
- if (before) {
- lastId = id;
- }
-
- DocInfo info = model.get(id);
-
- long val = info.val;
- long nextVal = Math.abs(val)+1;
-
- // the version we set on the update should determine who wins
- // These versions are not derived from the actual leader update handler hand hence this
- // test may need to change depending on how we handle version numbers.
- long version = testVersion.incrementAndGet();
-
- // yield after getting the next version to increase the odds of updates happening out of order
- if (rand.nextBoolean()) Thread.yield();
-
- if (oper < commitPercent + deletePercent) {
- verbose("deleting id",id,"val=",nextVal,"version",version);
-
- Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
-
- // TODO: returning versions for these types of updates is redundant
- // but if we do return, they had better be equal
- if (returnedVersion != null) {
- assertEquals(-version, returnedVersion.longValue());
- }
-
- // only update model if the version is newer
- synchronized (model) {
- DocInfo currInfo = model.get(id);
- if (Math.abs(version) > Math.abs(currInfo.version)) {
- model.put(id, new DocInfo(version, -nextVal));
- }
- }
-
- verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
- } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
-
- verbose("deleteByQuery id",id,"val=",nextVal,"version",version);
-
- Long returnedVersion = deleteByQueryAndGetVersion("id:"+Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
-
- // TODO: returning versions for these types of updates is redundant
- // but if we do return, they had better be equal
- if (returnedVersion != null) {
- assertEquals(-version, returnedVersion.longValue());
- }
-
- // only update model if the version is newer
- synchronized (model) {
- DocInfo currInfo = model.get(id);
- if (Math.abs(version) > Math.abs(currInfo.version)) {
- model.put(id, new DocInfo(version, -nextVal));
- }
- }
-
- verbose("deleteByQuery id", id, "val=",nextVal,"version",version,"DONE");
-
- } else {
- verbose("adding id", id, "val=", nextVal,"version",version);
-
- Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
- if (returnedVersion != null) {
- assertEquals(version, returnedVersion.longValue());
- }
-
- // only update model if the version is newer
- synchronized (model) {
- DocInfo currInfo = model.get(id);
- if (version > currInfo.version) {
- model.put(id, new DocInfo(version, nextVal));
- }
- }
-
- if (VERBOSE) {
- verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
- }
-
- }
- // } // end sync
-
- if (!before) {
- lastId = id;
- }
- }
- } catch (Throwable e) {
- operations.set(-1L);
- throw new RuntimeException(e);
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (int i=0; i<nReadThreads; i++) {
- Thread thread = new Thread("READER"+i) {
- Random rand = new Random(random().nextInt());
-
- @Override
- public void run() {
- try {
- while (operations.get() > 0) {
- // throttle reads (don't completely stop)
- readPermission.tryAcquire(10, TimeUnit.MILLISECONDS);
-
-
- // bias toward a recently changed doc
- int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
-
- // when indexing, we update the index, then the model
- // so when querying, we should first check the model, and then the index
-
- boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
- DocInfo info;
-
- if (realTime) {
- info = visibleModel.get(id);
- } else {
- synchronized(TestRealTimeGet.this) {
- info = committedModel.get(id);
- }
- }
-
-
- if (VERBOSE) {
- verbose("querying id", id);
- }
- SolrQueryRequest sreq;
- if (realTime) {
- sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
- } else {
- sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
- }
-
- String response = h.query(sreq);
- Map rsp = (Map)ObjectBuilder.fromJSON(response);
- List doclist = (List)(((Map)rsp.get("response")).get("docs"));
- if (doclist.size() == 0) {
- // there's no info we can get back with a delete, so not much we can check without further synchronization
- } else {
- assertEquals(1, doclist.size());
- long foundVal = (Long)(((Map)doclist.get(0)).get(field));
- long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
- if (foundVer < Math.abs(info.version)
- || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
- verbose("ERROR, id=", id, "found=",response,"model",info);
- assertTrue(false);
- }
- }
- }
-
-
- if (rand.nextInt(100) < percentGetLatestVersions) {
- getLatestVersions();
- // TODO: some sort of validation that the latest version is >= to the latest version we added?
- }
-
- } catch (Throwable e) {
- operations.set(-1L);
- throw new RuntimeException(e);
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (Thread thread : threads) {
- thread.start();
- }
-
- int bufferedAddsApplied = 0;
- do {
- assertTrue(uLog.getState() == UpdateLog.State.ACTIVE);
-
- // before we start buffering updates, we want to point
- // visibleModel away from the live model.
-
- visibleModel = new ConcurrentHashMap<Integer, DocInfo>(model);
-
- synchronized (stateChangeLock) {
- uLog.bufferUpdates();
- }
-
- assertTrue(uLog.getState() == UpdateLog.State.BUFFERING);
-
- // sometimes wait for a second to allow time for writers to write something
- if (random().nextBoolean()) Thread.sleep(random().nextInt(10)+1);
-
- Future<UpdateLog.RecoveryInfo> recoveryInfoF = uLog.applyBufferedUpdates();
- if (recoveryInfoF != null) {
- UpdateLog.RecoveryInfo recInfo = null;
-
- int writeThreadNumber = 0;
- while (recInfo == null) {
- try {
- // wait a short period of time for recovery to complete (and to give a chance for more writers to concurrently add docs)
- recInfo = recoveryInfoF.get(random().nextInt(100/nWriteThreads), TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- // idle one more write thread
- verbose("Operation",operations.get(),"Draining permits for write thread",writeThreadNumber);
- writePermissions[writeThreadNumber++].drainPermits();
- if (writeThreadNumber >= nWriteThreads) {
- // if we hit the end, back up and give a few write permits
- writeThreadNumber--;
- writePermissions[writeThreadNumber].release(random().nextInt(2) + 1);
- }
-
- // throttle readers so they don't steal too much CPU from the recovery thread
- readPermission.drainPermits();
- }
- }
-
- bufferedAddsApplied += recInfo.adds;
- }
-
- // put all writers back at full blast
- for (Semaphore writePerm : writePermissions) {
- // I don't think semaphores check for overflow, so we need to check mow many remain
- int neededPermits = Integer.MAX_VALUE - writePerm.availablePermits();
- if (neededPermits > 0) writePerm.release( neededPermits );
- }
-
- // put back readers at full blast and point back to live model
- visibleModel = model;
- int neededPermits = Integer.MAX_VALUE - readPermission.availablePermits();
- if (neededPermits > 0) readPermission.release( neededPermits );
-
- verbose("ROUND=",operations.get());
- } while (operations.decrementAndGet() > 0);
-
- verbose("bufferedAddsApplied=",bufferedAddsApplied);
-
- for (Thread thread : threads) {
- thread.join();
- }
-
- }
-
-
- List<Long> getLatestVersions() {
- List<Long> recentVersions;
- UpdateLog.RecentUpdates startingRecentUpdates = h.getCore().getUpdateHandler().getUpdateLog().getRecentUpdates();
- try {
- recentVersions = startingRecentUpdates.getVersions(100);
- } finally {
- startingRecentUpdates.close();
- }
- return recentVersions;
- }
-
-
-
-
-
-
- // The purpose of this test is to roughly model how solr uses lucene
- DirectoryReader reader;
- @Test
- public void testStressLuceneNRT() throws Exception {
- final int commitPercent = 5 + random().nextInt(20);
- final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
- final int deletePercent = 4+random().nextInt(25);
- final int deleteByQueryPercent = 1+random().nextInt(5);
- final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
- int nWriteThreads = 5 + random().nextInt(25);
-
- final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
-
- final AtomicLong operations = new AtomicLong(1000); // number of query operations to perform in total - crank up if
- int nReadThreads = 5 + random().nextInt(25);
- final boolean tombstones = random().nextBoolean();
- final boolean syncCommits = random().nextBoolean();
-
- verbose("commitPercent=", commitPercent);
- verbose("softCommitPercent=",softCommitPercent);
- verbose("deletePercent=",deletePercent);
- verbose("deleteByQueryPercent=", deleteByQueryPercent);
- verbose("ndocs=", ndocs);
- verbose("nWriteThreads=", nWriteThreads);
- verbose("nReadThreads=", nReadThreads);
- verbose("maxConcurrentCommits=", maxConcurrentCommits);
- verbose("operations=", operations);
- verbose("tombstones=", tombstones);
- verbose("syncCommits=", syncCommits);
-
- initModel(ndocs);
-
- final AtomicInteger numCommitting = new AtomicInteger();
-
- List<Thread> threads = new ArrayList<Thread>();
-
-
- final FieldType idFt = new FieldType();
- idFt.setIndexed(true);
- idFt.setStored(true);
- idFt.setOmitNorms(true);
- idFt.setTokenized(false);
- idFt.setIndexOptions(FieldInfo.IndexOptions.DOCS_ONLY);
-
- final FieldType ft2 = new FieldType();
- ft2.setIndexed(false);
- ft2.setStored(true);
-
-
- // model how solr does locking - only allow one thread to do a hard commit at once, and only one thread to do a soft commit, but
- // a hard commit in progress does not stop a soft commit.
- final Lock hardCommitLock = syncCommits ? new ReentrantLock() : null;
- final Lock reopenLock = syncCommits ? new ReentrantLock() : null;
-
-
- // RAMDirectory dir = new RAMDirectory();
- // final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new WhitespaceAnalyzer(Version.LUCENE_40)));
-
- Directory dir = newDirectory();
-
- final RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())));
- writer.setDoRandomForceMergeAssert(false);
-
- // writer.commit();
- // reader = IndexReader.open(dir);
- // make this reader an NRT reader from the start to avoid the first non-writer openIfChanged
- // to only opening at the last commit point.
- reader = DirectoryReader.open(writer.w, true);
-
- for (int i=0; i<nWriteThreads; i++) {
- Thread thread = new Thread("WRITER"+i) {
- Random rand = new Random(random().nextInt());
-
- @Override
- public void run() {
- try {
- while (operations.get() > 0) {
- int oper = rand.nextInt(100);
-
- if (oper < commitPercent) {
- if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,DocInfo> newCommittedModel;
- long version;
- DirectoryReader oldReader;
-
- boolean softCommit = rand.nextInt(100) < softCommitPercent;
-
- if (!softCommit) {
- // only allow one hard commit to proceed at once
- if (hardCommitLock != null) hardCommitLock.lock();
- verbose("hardCommit start");
-
- writer.commit();
- }
-
- if (reopenLock != null) reopenLock.lock();
-
- synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
- version = snapshotCount++;
- oldReader = reader;
- oldReader.incRef(); // increment the reference since we will use this for reopening
- }
-
- if (!softCommit) {
- // must commit after taking a snapshot of the model
- // writer.commit();
- }
-
- verbose("reopen start using", oldReader);
-
- DirectoryReader newReader;
- if (softCommit) {
- newReader = DirectoryReader.openIfChanged(oldReader, writer.w, true);
- } else {
- // will only open to last commit
- newReader = DirectoryReader.openIfChanged(oldReader);
- }
-
-
- if (newReader == null) {
- oldReader.incRef();
- newReader = oldReader;
- }
- oldReader.decRef();
-
- verbose("reopen result", newReader);
-
- synchronized(TestRealTimeGet.this) {
- assert newReader.getRefCount() > 0;
- assert reader.getRefCount() > 0;
-
- // install the new reader if it's newest (and check the current version since another reader may have already been installed)
- if (newReader.getVersion() > reader.getVersion()) {
- reader.decRef();
- reader = newReader;
-
- // install this snapshot only if it's newer than the current one
- if (version >= committedModelClock) {
- committedModel = newCommittedModel;
- committedModelClock = version;
- }
-
- } else {
- // close if unused
- newReader.decRef();
- }
-
- }
-
- if (reopenLock != null) reopenLock.unlock();
-
- if (!softCommit) {
- if (hardCommitLock != null) hardCommitLock.unlock();
- }
-
- }
- numCommitting.decrementAndGet();
- continue;
- }
-
-
- int id = rand.nextInt(ndocs);
- Object sync = syncArr[id];
-
- // set the lastId before we actually change it sometimes to try and
- // uncover more race conditions between writing and reading
- boolean before = rand.nextBoolean();
- if (before) {
- lastId = id;
- }
-
- // We can't concurrently update the same document and retain our invariants of increasing values
- // since we can't guarantee what order the updates will be executed.
- synchronized (sync) {
- DocInfo info = model.get(id);
- long val = info.val;
- long nextVal = Math.abs(val)+1;
-
- if (oper < commitPercent + deletePercent) {
- // add tombstone first
- if (tombstones) {
- Document d = new Document();
- d.add(new Field("id","-"+Integer.toString(id), idFt));
- d.add(new Field(field, Long.toString(nextVal), ft2));
- verbose("adding tombstone for id",id,"val=",nextVal);
- writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
- }
-
- verbose("deleting id",id,"val=",nextVal);
- writer.deleteDocuments(new Term("id",Integer.toString(id)));
- model.put(id, new DocInfo(0,-nextVal));
- verbose("deleting id",id,"val=",nextVal,"DONE");
-
- } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
- //assertU("<delete><query>id:" + id + "</query></delete>");
-
- // add tombstone first
- if (tombstones) {
- Document d = new Document();
- d.add(new Field("id","-"+Integer.toString(id), idFt));
- d.add(new Field(field, Long.toString(nextVal), ft2));
- verbose("adding tombstone for id",id,"val=",nextVal);
- writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
- }
-
- verbose("deleteByQuery",id,"val=",nextVal);
- writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
- model.put(id, new DocInfo(0,-nextVal));
- verbose("deleteByQuery",id,"val=",nextVal,"DONE");
- } else {
- // model.put(id, nextVal); // uncomment this and this test should fail.
-
- // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
- Document d = new Document();
- d.add(new Field("id",Integer.toString(id), idFt));
- d.add(new Field(field, Long.toString(nextVal), ft2));
- verbose("adding id",id,"val=",nextVal);
- writer.updateDocument(new Term("id", Integer.toString(id)), d);
- if (tombstones) {
- // remove tombstone after new addition (this should be optional?)
- verbose("deleting tombstone for id",id);
- writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
- verbose("deleting tombstone for id",id,"DONE");
- }
-
- model.put(id, new DocInfo(0,nextVal));
- verbose("adding id",id,"val=",nextVal,"DONE");
- }
- }
-
- if (!before) {
- lastId = id;
- }
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (int i=0; i<nReadThreads; i++) {
- Thread thread = new Thread("READER"+i) {
- Random rand = new Random(random().nextInt());
-
- @Override
- public void run() {
- try {
- while (operations.decrementAndGet() >= 0) {
- // bias toward a recently changed doc
- int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
-
- // when indexing, we update the index, then the model
- // so when querying, we should first check the model, and then the index
-
- DocInfo info;
- synchronized(TestRealTimeGet.this) {
- info = committedModel.get(id);
- }
- long val = info.val;
-
- IndexReader r;
- synchronized(TestRealTimeGet.this) {
- r = reader;
- r.incRef();
- }
-
- int docid = getFirstMatch(r, new Term("id",Integer.toString(id)));
-
- if (docid < 0 && tombstones) {
- // if we couldn't find the doc, look for it's tombstone
- docid = getFirstMatch(r, new Term("id","-"+Integer.toString(id)));
- if (docid < 0) {
- if (val == -1L) {
- // expected... no doc was added yet
- r.decRef();
- continue;
- }
- verbose("ERROR: Couldn't find a doc or tombstone for id", id, "using reader",r,"expected value",val);
- fail("No documents or tombstones found for id " + id + ", expected at least " + val);
- }
- }
-
- if (docid < 0 && !tombstones) {
- // nothing to do - we can't tell anything from a deleted doc without tombstones
- } else {
- if (docid < 0) {
- verbose("ERROR: Couldn't find a doc for id", id, "using reader",r);
- }
- assertTrue(docid >= 0); // we should have found the document, or it's tombstone
- Document doc = r.document(docid);
- long foundVal = Long.parseLong(doc.get(field));
- if (foundVal < Math.abs(val)) {
- verbose("ERROR: id",id,"model_val=",val," foundVal=",foundVal,"reader=",reader);
- }
- assertTrue(foundVal >= Math.abs(val));
- }
-
- r.decRef();
- }
- } catch (Throwable e) {
- operations.set(-1L);
- throw new RuntimeException(e);
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (Thread thread : threads) {
- thread.start();
- }
-
- for (Thread thread : threads) {
- thread.join();
- }
-
- writer.close();
- reader.close();
- dir.close();
- }
-
-
- public int getFirstMatch(IndexReader r, Term t) throws IOException {
- Fields fields = MultiFields.getFields(r);
- if (fields == null) return -1;
- Terms terms = fields.terms(t.field());
- if (terms == null) return -1;
- BytesRef termBytes = t.bytes();
- final TermsEnum termsEnum = terms.iterator(null);
- if (!termsEnum.seekExact(termBytes, false)) {
- return -1;
- }
- DocsEnum docs = termsEnum.docs(MultiFields.getLiveDocs(r), null, false);
- int id = docs.nextDoc();
- if (id != DocIdSetIterator.NO_MORE_DOCS) {
- int next = docs.nextDoc();
- assertEquals(DocIdSetIterator.NO_MORE_DOCS, next);
- }
- return id == DocIdSetIterator.NO_MORE_DOCS ? -1 : id;
- }
-
}
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressLucene.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressLucene.java?rev=1357324&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressLucene.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressLucene.java Wed Jul 4 16:02:42 2012
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.solr.core.SolrCore.verbose;
+
+public class TestStressLucene extends TestRTGBase {
+
+ // The purpose of this test is to roughly model how solr uses lucene
+ DirectoryReader reader;
+ @Test
+ public void testStressLuceneNRT() throws Exception {
+ final int commitPercent = 5 + random().nextInt(20);
+ final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4+random().nextInt(25);
+ final int deleteByQueryPercent = 1+random().nextInt(5);
+ final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
+ int nWriteThreads = 5 + random().nextInt(25);
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ final AtomicLong operations = new AtomicLong(100000); // number of query operations to perform in total
+ int nReadThreads = 5 + random().nextInt(25);
+ final boolean tombstones = random().nextBoolean();
+ final boolean syncCommits = random().nextBoolean();
+
+ verbose("commitPercent=", commitPercent);
+ verbose("softCommitPercent=",softCommitPercent);
+ verbose("deletePercent=",deletePercent);
+ verbose("deleteByQueryPercent=", deleteByQueryPercent);
+ verbose("ndocs=", ndocs);
+ verbose("nWriteThreads=", nWriteThreads);
+ verbose("nReadThreads=", nReadThreads);
+ verbose("maxConcurrentCommits=", maxConcurrentCommits);
+ verbose("operations=", operations);
+ verbose("tombstones=", tombstones);
+ verbose("syncCommits=", syncCommits);
+
+ initModel(ndocs);
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+
+ final FieldType idFt = new FieldType();
+ idFt.setIndexed(true);
+ idFt.setStored(true);
+ idFt.setOmitNorms(true);
+ idFt.setTokenized(false);
+ idFt.setIndexOptions(FieldInfo.IndexOptions.DOCS_ONLY);
+
+ final FieldType ft2 = new FieldType();
+ ft2.setIndexed(false);
+ ft2.setStored(true);
+
+
+ // model how solr does locking - only allow one thread to do a hard commit at once, and only one thread to do a soft commit, but
+ // a hard commit in progress does not stop a soft commit.
+ final Lock hardCommitLock = syncCommits ? new ReentrantLock() : null;
+ final Lock reopenLock = syncCommits ? new ReentrantLock() : null;
+
+
+ // RAMDirectory dir = new RAMDirectory();
+ // final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new WhitespaceAnalyzer(Version.LUCENE_40)));
+
+ Directory dir = newDirectory();
+
+ final RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())));
+ writer.setDoRandomForceMergeAssert(false);
+
+ // writer.commit();
+ // reader = IndexReader.open(dir);
+ // make this reader an NRT reader from the start to avoid the first non-writer openIfChanged
+ // to only opening at the last commit point.
+ reader = DirectoryReader.open(writer.w, true);
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random().nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,DocInfo> newCommittedModel;
+ long version;
+ DirectoryReader oldReader;
+
+ boolean softCommit = rand.nextInt(100) < softCommitPercent;
+
+ if (!softCommit) {
+ // only allow one hard commit to proceed at once
+ if (hardCommitLock != null) hardCommitLock.lock();
+ verbose("hardCommit start");
+
+ writer.commit();
+ }
+
+ if (reopenLock != null) reopenLock.lock();
+
+ synchronized(globalLock) {
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
+ version = snapshotCount++;
+ oldReader = reader;
+ oldReader.incRef(); // increment the reference since we will use this for reopening
+ }
+
+ if (!softCommit) {
+ // must commit after taking a snapshot of the model
+ // writer.commit();
+ }
+
+ verbose("reopen start using", oldReader);
+
+ DirectoryReader newReader;
+ if (softCommit) {
+ newReader = DirectoryReader.openIfChanged(oldReader, writer.w, true);
+ } else {
+ // will only open to last commit
+ newReader = DirectoryReader.openIfChanged(oldReader);
+ }
+
+
+ if (newReader == null) {
+ oldReader.incRef();
+ newReader = oldReader;
+ }
+ oldReader.decRef();
+
+ verbose("reopen result", newReader);
+
+ synchronized(globalLock) {
+ assert newReader.getRefCount() > 0;
+ assert reader.getRefCount() > 0;
+
+ // install the new reader if it's newest (and check the current version since another reader may have already been installed)
+ if (newReader.getVersion() > reader.getVersion()) {
+ reader.decRef();
+ reader = newReader;
+
+ // install this snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+
+ } else {
+ // close if unused
+ newReader.decRef();
+ }
+
+ }
+
+ if (reopenLock != null) reopenLock.unlock();
+
+ if (!softCommit) {
+ if (hardCommitLock != null) hardCommitLock.unlock();
+ }
+
+ }
+ numCommitting.decrementAndGet();
+ continue;
+ }
+
+
+ int id = rand.nextInt(ndocs);
+ Object sync = syncArr[id];
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ // We can't concurrently update the same document and retain our invariants of increasing values
+ // since we can't guarantee what order the updates will be executed.
+ synchronized (sync) {
+ DocInfo info = model.get(id);
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
+
+ if (oper < commitPercent + deletePercent) {
+ // add tombstone first
+ if (tombstones) {
+ Document d = new Document();
+ d.add(new Field("id","-"+Integer.toString(id), idFt));
+ d.add(new Field(field, Long.toString(nextVal), ft2));
+ verbose("adding tombstone for id",id,"val=",nextVal);
+ writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
+ }
+
+ verbose("deleting id",id,"val=",nextVal);
+ writer.deleteDocuments(new Term("id",Integer.toString(id)));
+ model.put(id, new DocInfo(0,-nextVal));
+ verbose("deleting id",id,"val=",nextVal,"DONE");
+
+ } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ //assertU("<delete><query>id:" + id + "</query></delete>");
+
+ // add tombstone first
+ if (tombstones) {
+ Document d = new Document();
+ d.add(new Field("id","-"+Integer.toString(id), idFt));
+ d.add(new Field(field, Long.toString(nextVal), ft2));
+ verbose("adding tombstone for id",id,"val=",nextVal);
+ writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
+ }
+
+ verbose("deleteByQuery",id,"val=",nextVal);
+ writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
+ model.put(id, new DocInfo(0,-nextVal));
+ verbose("deleteByQuery",id,"val=",nextVal,"DONE");
+ } else {
+ // model.put(id, nextVal); // uncomment this and this test should fail.
+
+ // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+ Document d = new Document();
+ d.add(new Field("id",Integer.toString(id), idFt));
+ d.add(new Field(field, Long.toString(nextVal), ft2));
+ verbose("adding id",id,"val=",nextVal);
+ writer.updateDocument(new Term("id", Integer.toString(id)), d);
+ if (tombstones) {
+ // remove tombstone after new addition (this should be optional?)
+ verbose("deleting tombstone for id",id);
+ writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
+ verbose("deleting tombstone for id",id,"DONE");
+ }
+
+ model.put(id, new DocInfo(0,nextVal));
+ verbose("adding id",id,"val=",nextVal,"DONE");
+ }
+ }
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random().nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ DocInfo info;
+ synchronized(globalLock) {
+ info = committedModel.get(id);
+ }
+ long val = info.val;
+
+ IndexReader r;
+ synchronized(globalLock) {
+ r = reader;
+ r.incRef();
+ }
+
+ int docid = getFirstMatch(r, new Term("id",Integer.toString(id)));
+
+ if (docid < 0 && tombstones) {
+ // if we couldn't find the doc, look for it's tombstone
+ docid = getFirstMatch(r, new Term("id","-"+Integer.toString(id)));
+ if (docid < 0) {
+ if (val == -1L) {
+ // expected... no doc was added yet
+ r.decRef();
+ continue;
+ }
+ verbose("ERROR: Couldn't find a doc or tombstone for id", id, "using reader",r,"expected value",val);
+ fail("No documents or tombstones found for id " + id + ", expected at least " + val);
+ }
+ }
+
+ if (docid < 0 && !tombstones) {
+ // nothing to do - we can't tell anything from a deleted doc without tombstones
+ } else {
+ if (docid < 0) {
+ verbose("ERROR: Couldn't find a doc for id", id, "using reader",r);
+ }
+ assertTrue(docid >= 0); // we should have found the document, or it's tombstone
+ Document doc = r.document(docid);
+ long foundVal = Long.parseLong(doc.get(field));
+ if (foundVal < Math.abs(val)) {
+ verbose("ERROR: id",id,"model_val=",val," foundVal=",foundVal,"reader=",reader);
+ }
+ assertTrue(foundVal >= Math.abs(val));
+ }
+
+ r.decRef();
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ writer.close();
+ reader.close();
+ dir.close();
+ }
+
+
+
+}
|