Return-Path: X-Original-To: apmail-lucene-commits-archive@www.apache.org Delivered-To: apmail-lucene-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 34365C269 for ; Tue, 22 May 2012 00:36:35 +0000 (UTC) Received: (qmail 72567 invoked by uid 500); 22 May 2012 00:36:35 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 72556 invoked by uid 99); 22 May 2012 00:36:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 May 2012 00:36:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 May 2012 00:36:32 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 883E123889BB; Tue, 22 May 2012 00:36:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1341283 - in /lucene/dev/trunk/solr/core/src: java/org/apache/solr/cloud/ java/org/apache/solr/handler/component/ java/org/apache/solr/update/ test/org/apache/solr/search/ Date: Tue, 22 May 2012 00:36:12 -0000 To: commits@lucene.apache.org From: yonik@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120522003612.883E123889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yonik Date: Tue May 22 00:36:11 2012 New Revision: 1341283 URL: http://svn.apache.org/viewvc?rev=1341283&view=rev Log: SOLR-3469: prevent false peersync recovery by recording buffering flags in tlog Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1341283&r1=1341282&r2=1341283&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original) +++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Tue May 22 00:36:11 2012 @@ -226,40 +226,48 @@ public class RecoveryStrategy extends Th } - List startingRecentVersions; - UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates(); + List recentVersions; + UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates(); try { - startingRecentVersions = startingRecentUpdates.getVersions(ulog.numRecordsToKeep); + recentVersions = recentUpdates.getVersions(ulog.numRecordsToKeep); } finally { - startingRecentUpdates.close(); + recentUpdates.close(); } - List reallyStartingVersions = ulog.getStartingVersions(); + List startingVersions = ulog.getStartingVersions(); - if (reallyStartingVersions != null && recoveringAfterStartup) { + if (startingVersions != null && recoveringAfterStartup) { int oldIdx = 0; // index of the start of the old list in the current list - long firstStartingVersion = reallyStartingVersions.size() > 0 ? reallyStartingVersions.get(0) : 0; + long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0; - for (; oldIdx 0) { log.info("####### Found new versions added after startup: num=" + oldIdx); - log.info("###### currentVersions=" + startingRecentVersions); + log.info("###### currentVersions=" + recentVersions); } - log.info("###### startupVersions=" + reallyStartingVersions); + log.info("###### startupVersions=" + startingVersions); } + + boolean firstTime = true; + if (recoveringAfterStartup) { // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were - // when we went down. - startingRecentVersions = reallyStartingVersions; - } + // when we went down. We may have received updates since then. + recentVersions = startingVersions; - boolean firstTime = true; + if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) { + // last operation at the time of startup had the GAP flag set... + // this means we were previously doing a full index replication + // that probably didn't complete and buffering updates in the meantime. + firstTime = false; // skip peersync + } + } while (!successfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though try { @@ -287,7 +295,7 @@ public class RecoveryStrategy extends Th // + " i am:" + zkController.getNodeName()); PeerSync peerSync = new PeerSync(core, Collections.singletonList(leaderUrl), ulog.numRecordsToKeep); - peerSync.setStartingVersions(startingRecentVersions); + peerSync.setStartingVersions(recentVersions); boolean syncSuccess = peerSync.sync(); if (syncSuccess) { SolrQueryRequest req = new LocalSolrQueryRequest(core, Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1341283&r1=1341282&r2=1341283&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original) +++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Tue May 22 00:36:11 2012 @@ -141,7 +141,7 @@ public class RealTimeGetComponent extend // should currently be a List List entry = (List)o; assert entry.size() >= 3; - int oper = (Integer)entry.get(0); + int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK; switch (oper) { case UpdateLog.ADD: SolrDocument doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), req.getSchema()); @@ -211,7 +211,7 @@ public class RealTimeGetComponent extend // should currently be a List List entry = (List)o; assert entry.size() >= 3; - int oper = (Integer)entry.get(0); + int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK; switch (oper) { case UpdateLog.ADD: sid = (SolrInputDocument)entry.get(entry.size()-1); Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1341283&r1=1341282&r2=1341283&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (original) +++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Tue May 22 00:36:11 2012 @@ -233,7 +233,7 @@ public class PeerSync { } // let's merge the lists - List newList = new ArrayList(ourUpdates); + List newList = new ArrayList(ourUpdates); for (Long ver : startingVersions) { if (Math.abs(ver) < smallestNewUpdate) { newList.add(ver); @@ -457,8 +457,8 @@ public class PeerSync { if (debug) { log.debug(msg() + "raw update record " + o); } - - int oper = (Integer)entry.get(0); + + int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK; long version = (Long) entry.get(1); if (version == lastVersion && version != 0) continue; lastVersion = version; Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1341283&r1=1341282&r2=1341283&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original) +++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java Tue May 22 00:36:11 2012 @@ -296,7 +296,7 @@ public class TransactionLog { } - public long write(AddUpdateCommand cmd) { + public long write(AddUpdateCommand cmd, int flags) { LogCodec codec = new LogCodec(); long pos = 0; synchronized (this) { @@ -319,7 +319,7 @@ public class TransactionLog { codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.ADD); // should just take one byte + codec.writeInt(UpdateLog.ADD | flags); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeSolrInputDocument(cmd.getSolrInputDocument()); @@ -333,7 +333,7 @@ public class TransactionLog { } } - public long writeDelete(DeleteUpdateCommand cmd) { + public long writeDelete(DeleteUpdateCommand cmd, int flags) { LogCodec codec = new LogCodec(); synchronized (this) { try { @@ -344,7 +344,7 @@ public class TransactionLog { } codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.DELETE); // should just take one byte + codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte codec.writeLong(cmd.getVersion()); BytesRef br = cmd.getIndexedId(); codec.writeByteArray(br.bytes, br.offset, br.length); @@ -359,7 +359,7 @@ public class TransactionLog { } } - public long writeDeleteByQuery(DeleteUpdateCommand cmd) { + public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) { LogCodec codec = new LogCodec(); synchronized (this) { try { @@ -370,7 +370,7 @@ public class TransactionLog { } codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte + codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeStr(cmd.query); @@ -385,7 +385,7 @@ public class TransactionLog { } - public long writeCommit(CommitUpdateCommand cmd) { + public long writeCommit(CommitUpdateCommand cmd, int flags) { LogCodec codec = new LogCodec(); synchronized (this) { try { @@ -397,7 +397,7 @@ public class TransactionLog { } codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.COMMIT); // should just take one byte + codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1341283&r1=1341282&r2=1341283&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original) +++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Tue May 22 00:36:11 2012 @@ -64,6 +64,12 @@ public class UpdateLog implements Plugin public static final int DELETE = 0x02; public static final int DELETE_BY_QUERY = 0x03; public static final int COMMIT = 0x04; + // Flag indicating that this is a buffered operation, and that a gap exists before buffering started. + // for example, if full index replication starts and we are buffering updates, then this flag should + // be set to indicate that replaying the log would not bring us into sync (i.e. peersync should + // fail if this flag is set on the last update in the tlog). + public static final int FLAG_GAP = 0x10; + public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation public static class RecoveryInfo { public long positionOfStart; @@ -81,6 +87,7 @@ public class UpdateLog implements Plugin long id = -1; private State state = State.ACTIVE; + private int operationFlags; // flags to write in the transaction log with operations (i.e. FLAG_GAP) private TransactionLog tlog; private TransactionLog prevTlog; @@ -117,7 +124,7 @@ public class UpdateLog implements Plugin private volatile UpdateHandler uhandler; // a core reload can change this reference! private volatile boolean cancelApplyBufferUpdate; List startingVersions; - + int startingOperation; // last operation in the logs on startup public static class LogPtr { final long pointer; @@ -188,16 +195,18 @@ public class UpdateLog implements Plugin versionInfo = new VersionInfo(uhandler, 256); // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs. - UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates(); + UpdateLog.RecentUpdates startingUpdates = getRecentUpdates(); try { - startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep); + startingVersions = startingUpdates.getVersions(numRecordsToKeep); + startingOperation = startingUpdates.getLatestOperation(); + // populate recent deletes list (since we can't get that info from the index) - for (int i=startingRecentUpdates.deleteList.size()-1; i>=0; i--) { - DeleteUpdate du = startingRecentUpdates.deleteList.get(i); + for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) { + DeleteUpdate du = startingUpdates.deleteList.get(i); oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version)); } } finally { - startingRecentUpdates.close(); + startingUpdates.close(); } } @@ -210,6 +219,10 @@ public class UpdateLog implements Plugin return startingVersions; } + public int getStartingOperation() { + return startingOperation; + } + /* Takes over ownership of the log, keeping it until no longer needed and then decrementing it's reference and dropping it. */ @@ -275,7 +288,7 @@ public class UpdateLog implements Plugin // don't log if we are replaying from another log if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { ensureLog(); - pos = tlog.write(cmd); + pos = tlog.write(cmd, operationFlags); } // TODO: in the future we could support a real position for a REPLAY update. @@ -302,7 +315,7 @@ public class UpdateLog implements Plugin // don't log if we are replaying from another log if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { ensureLog(); - pos = tlog.writeDelete(cmd); + pos = tlog.writeDelete(cmd, operationFlags); } LogPtr ptr = new LogPtr(pos, cmd.version); @@ -326,7 +339,7 @@ public class UpdateLog implements Plugin // don't log if we are replaying from another log if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { ensureLog(); - pos = tlog.writeDeleteByQuery(cmd); + pos = tlog.writeDeleteByQuery(cmd, operationFlags); } // only change our caches if we are not buffering @@ -417,7 +430,7 @@ public class UpdateLog implements Plugin if (prevTlog != null) { // if we made it through the commit, write a commit command to the log // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup. - prevTlog.writeCommit(cmd); + prevTlog.writeCommit(cmd, operationFlags); addOldLog(prevTlog, true); // the old log list will decref when no longer needed @@ -630,7 +643,7 @@ public class UpdateLog implements Plugin // record a commit log.info("Recording current closed for " + uhandler.core + " log=" + theLog); CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false); - theLog.writeCommit(cmd); + theLog.writeCommit(cmd, operationFlags); } theLog.deleteOnClose = false; @@ -685,7 +698,7 @@ public class UpdateLog implements Plugin HashMap updates; List deleteByQueryList; List deleteList; - + int latestOperation; public List getVersions(int n) { List ret = new ArrayList(n); @@ -719,6 +732,10 @@ public class UpdateLog implements Plugin return result; } + public int getLatestOperation() { + return latestOperation; + } + private void update() { int numUpdates = 0; @@ -743,7 +760,11 @@ public class UpdateLog implements Plugin List entry = (List)o; // TODO: refactor this out so we get common error handling - int oper = (Integer)entry.get(0); + int opAndFlags = (Integer)entry.get(0); + if (latestOperation == 0) { + latestOperation = opAndFlags; + } + int oper = opAndFlags & UpdateLog.OPERATION_MASK; long version = (Long) entry.get(1); switch (oper) { @@ -849,6 +870,9 @@ public class UpdateLog implements Plugin } state = State.BUFFERING; + + // currently, buffering is only called by recovery, meaning that there is most likely a gap in updates + operationFlags |= FLAG_GAP; } finally { versionInfo.unblockUpdates(); } @@ -872,6 +896,7 @@ public class UpdateLog implements Plugin } state = State.ACTIVE; + operationFlags &= ~FLAG_GAP; } catch (IOException e) { SolrException.log(log,"Error attempting to roll back log", e); return false; @@ -904,6 +929,7 @@ public class UpdateLog implements Plugin } tlog.incref(); state = State.APPLYING_BUFFERED; + operationFlags &= ~FLAG_GAP; } finally { versionInfo.unblockUpdates(); } @@ -1002,6 +1028,7 @@ public class UpdateLog implements Plugin UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null)); long commitVersion = 0; + int operationAndFlags = 0; for(;;) { Object o = null; @@ -1046,7 +1073,8 @@ public class UpdateLog implements Plugin // should currently be a List List entry = (List)o; - int oper = (Integer)entry.get(0); + operationAndFlags = (Integer)entry.get(0); + int oper = operationAndFlags & OPERATION_MASK; long version = (Long) entry.get(1); switch (oper) { @@ -1136,7 +1164,10 @@ public class UpdateLog implements Plugin if (!activeLog) { // if we are replaying an old tlog file, we need to add a commit to the end // so we don't replay it again if we restart right after. - translog.writeCommit(cmd); + + // if the last operation we replayed had FLAG_GAP set, we want to use that again so we don't lose it + // as the flag on the last operation. + translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK)); } try { Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1341283&r1=1341282&r2=1341283&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original) +++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java Tue May 22 00:36:11 2012 @@ -468,6 +468,93 @@ public class TestRecovery extends SolrTe } + @Test + public void testBufferingFlags() throws Exception { + + DirectUpdateHandler2.commitOnClose = false; + final Semaphore logReplayFinish = new Semaphore(0); + + UpdateLog.testing_logReplayFinishHook = new Runnable() { + @Override + public void run() { + logReplayFinish.release(); + } + }; + + + SolrQueryRequest req = req(); + UpdateHandler uhandler = req.getCore().getUpdateHandler(); + UpdateLog ulog = uhandler.getUpdateLog(); + + try { + clearIndex(); + assertU(commit()); + + assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); + ulog.bufferUpdates(); + + // simulate updates from a leader + updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); + + req.close(); + h.close(); + createCore(); + + req = req(); + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + + logReplayFinish.acquire(); // wait for replay to finish + + assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // since we died while buffering, we should see this last + + // + // Try again to ensure that the previous log replay didn't wipe out our flags + // + + req.close(); + h.close(); + createCore(); + + req = req(); + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + + assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); + + // now do some normal non-buffered adds + updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + assertU(commit()); + + req.close(); + h.close(); + createCore(); + + req = req(); + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + + assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); + + + assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state + } finally { + DirectUpdateHandler2.commitOnClose = true; + UpdateLog.testing_logReplayHook = null; + UpdateLog.testing_logReplayFinishHook = null; + + req().close(); + } + + } + + + // make sure that on a restart, versions don't start too low @Test public void testVersionsOnRestart() throws Exception {