lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yo...@apache.org
Subject svn commit: r1341283 - in /lucene/dev/trunk/solr/core/src: java/org/apache/solr/cloud/ java/org/apache/solr/handler/component/ java/org/apache/solr/update/ test/org/apache/solr/search/
Date Tue, 22 May 2012 00:36:12 GMT
Author: yonik
Date: Tue May 22 00:36:11 2012
New Revision: 1341283

URL: http://svn.apache.org/viewvc?rev=1341283&view=rev
Log:
SOLR-3469: prevent false peersync recovery by recording buffering flags in tlog

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1341283&r1=1341282&r2=1341283&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Tue May
22 00:36:11 2012
@@ -226,40 +226,48 @@ public class RecoveryStrategy extends Th
     }
 
 
-    List<Long> startingRecentVersions;
-    UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates();
+    List<Long> recentVersions;
+    UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
     try {
-      startingRecentVersions = startingRecentUpdates.getVersions(ulog.numRecordsToKeep);
+      recentVersions = recentUpdates.getVersions(ulog.numRecordsToKeep);
     } finally {
-      startingRecentUpdates.close();
+      recentUpdates.close();
     }
 
-    List<Long> reallyStartingVersions = ulog.getStartingVersions();
+    List<Long> startingVersions = ulog.getStartingVersions();
 
 
-    if (reallyStartingVersions != null && recoveringAfterStartup) {
+    if (startingVersions != null && recoveringAfterStartup) {
       int oldIdx = 0;  // index of the start of the old list in the current list
-      long firstStartingVersion = reallyStartingVersions.size() > 0 ? reallyStartingVersions.get(0)
: 0;
+      long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0)
: 0;
 
-      for (; oldIdx<startingRecentVersions.size(); oldIdx++) {
-        if (startingRecentVersions.get(oldIdx) == firstStartingVersion) break;
+      for (; oldIdx<recentVersions.size(); oldIdx++) {
+        if (recentVersions.get(oldIdx) == firstStartingVersion) break;
       }
 
       if (oldIdx > 0) {
         log.info("####### Found new versions added after startup: num=" + oldIdx);
-        log.info("###### currentVersions=" + startingRecentVersions);
+        log.info("###### currentVersions=" + recentVersions);
       }
 
-      log.info("###### startupVersions=" + reallyStartingVersions);
+      log.info("###### startupVersions=" + startingVersions);
     }
 
+
+    boolean firstTime = true;
+
     if (recoveringAfterStartup) {
       // if we're recovering after startup (i.e. we have been down), then we need to know
what the last versions were
-      // when we went down.
-      startingRecentVersions = reallyStartingVersions;
-    }
+      // when we went down.  We may have received updates since then.
+      recentVersions = startingVersions;
 
-    boolean firstTime = true;
+      if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
+        // last operation at the time of startup had the GAP flag set...
+        // this means we were previously doing a full index replication
+        // that probably didn't complete and buffering updates in the meantime.
+        firstTime = false;    // skip peersync
+      }
+    }
 
     while (!successfulRecovery && !close && !isInterrupted()) { // don't
use interruption or it will close channels though
       try {
@@ -287,7 +295,7 @@ public class RecoveryStrategy extends Th
           // + " i am:" + zkController.getNodeName());
           PeerSync peerSync = new PeerSync(core,
               Collections.singletonList(leaderUrl), ulog.numRecordsToKeep);
-          peerSync.setStartingVersions(startingRecentVersions);
+          peerSync.setStartingVersions(recentVersions);
           boolean syncSuccess = peerSync.sync();
           if (syncSuccess) {
             SolrQueryRequest req = new LocalSolrQueryRequest(core,

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1341283&r1=1341282&r2=1341283&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
(original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
Tue May 22 00:36:11 2012
@@ -141,7 +141,7 @@ public class RealTimeGetComponent extend
            // should currently be a List<Oper,Ver,Doc/Id>
            List entry = (List)o;
            assert entry.size() >= 3;
-           int oper = (Integer)entry.get(0);
+           int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
            switch (oper) {
              case UpdateLog.ADD:
                SolrDocument doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1),
req.getSchema());
@@ -211,7 +211,7 @@ public class RealTimeGetComponent extend
           // should currently be a List<Oper,Ver,Doc/Id>
           List entry = (List)o;
           assert entry.size() >= 3;
-          int oper = (Integer)entry.get(0);
+          int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
           switch (oper) {
             case UpdateLog.ADD:
               sid = (SolrInputDocument)entry.get(entry.size()-1);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1341283&r1=1341282&r2=1341283&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Tue May 22 00:36:11
2012
@@ -233,7 +233,7 @@ public class PeerSync  {
       }
 
       // let's merge the lists
-      List<Long> newList = new ArrayList(ourUpdates);
+      List<Long> newList = new ArrayList<Long>(ourUpdates);
       for (Long ver : startingVersions) {
         if (Math.abs(ver) < smallestNewUpdate) {
           newList.add(ver);
@@ -457,8 +457,8 @@ public class PeerSync  {
         if (debug) {
           log.debug(msg() + "raw update record " + o);
         }
-        
-        int oper = (Integer)entry.get(0);
+
+        int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
         long version = (Long) entry.get(1);
         if (version == lastVersion && version != 0) continue;
         lastVersion = version;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1341283&r1=1341282&r2=1341283&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java Tue May
22 00:36:11 2012
@@ -296,7 +296,7 @@ public class TransactionLog {
   }
 
 
-  public long write(AddUpdateCommand cmd) {
+  public long write(AddUpdateCommand cmd, int flags) {
     LogCodec codec = new LogCodec();
     long pos = 0;
     synchronized (this) {
@@ -319,7 +319,7 @@ public class TransactionLog {
 
         codec.init(fos);
         codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.ADD);  // should just take one byte
+        codec.writeInt(UpdateLog.ADD | flags);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         codec.writeSolrInputDocument(cmd.getSolrInputDocument());
 
@@ -333,7 +333,7 @@ public class TransactionLog {
     }
   }
 
-  public long writeDelete(DeleteUpdateCommand cmd) {
+  public long writeDelete(DeleteUpdateCommand cmd, int flags) {
     LogCodec codec = new LogCodec();
     synchronized (this) {
       try {
@@ -344,7 +344,7 @@ public class TransactionLog {
         }
         codec.init(fos);
         codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.DELETE);  // should just take one byte
+        codec.writeInt(UpdateLog.DELETE | flags);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         BytesRef br = cmd.getIndexedId();
         codec.writeByteArray(br.bytes, br.offset, br.length);
@@ -359,7 +359,7 @@ public class TransactionLog {
     }
   }
 
-  public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
+  public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
     LogCodec codec = new LogCodec();
     synchronized (this) {
       try {
@@ -370,7 +370,7 @@ public class TransactionLog {
         }
         codec.init(fos);
         codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.DELETE_BY_QUERY);  // should just take one byte
+        codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         codec.writeStr(cmd.query);
 
@@ -385,7 +385,7 @@ public class TransactionLog {
   }
 
 
-  public long writeCommit(CommitUpdateCommand cmd) {
+  public long writeCommit(CommitUpdateCommand cmd, int flags) {
     LogCodec codec = new LogCodec();
     synchronized (this) {
       try {
@@ -397,7 +397,7 @@ public class TransactionLog {
         }
         codec.init(fos);
         codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.COMMIT);  // should just take one byte
+        codec.writeInt(UpdateLog.COMMIT | flags);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         codec.writeStr(END_MESSAGE);  // ensure these bytes are (almost) last in the file
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1341283&r1=1341282&r2=1341283&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Tue May 22 00:36:11
2012
@@ -64,6 +64,12 @@ public class UpdateLog implements Plugin
   public static final int DELETE = 0x02;
   public static final int DELETE_BY_QUERY = 0x03;
   public static final int COMMIT = 0x04;
+  // Flag indicating that this is a buffered operation, and that a gap exists before buffering
started.
+  // for example, if full index replication starts and we are buffering updates, then this
flag should
+  // be set to indicate that replaying the log would not bring us into sync (i.e. peersync
should
+  // fail if this flag is set on the last update in the tlog).
+  public static final int FLAG_GAP = 0x10;
+  public static final int OPERATION_MASK = 0x0f;  // mask off flags to get the operation
 
   public static class RecoveryInfo {
     public long positionOfStart;
@@ -81,6 +87,7 @@ public class UpdateLog implements Plugin
 
   long id = -1;
   private State state = State.ACTIVE;
+  private int operationFlags;  // flags to write in the transaction log with operations (i.e.
FLAG_GAP)
 
   private TransactionLog tlog;
   private TransactionLog prevTlog;
@@ -117,7 +124,7 @@ public class UpdateLog implements Plugin
   private volatile UpdateHandler uhandler;    // a core reload can change this reference!
   private volatile boolean cancelApplyBufferUpdate;
   List<Long> startingVersions;
-
+  int startingOperation;  // last operation in the logs on startup
 
   public static class LogPtr {
     final long pointer;
@@ -188,16 +195,18 @@ public class UpdateLog implements Plugin
     versionInfo = new VersionInfo(uhandler, 256);
 
     // TODO: these startingVersions assume that we successfully recover from all non-complete
tlogs.
-    UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates();
+    UpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
     try {
-      startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep);
+      startingVersions = startingUpdates.getVersions(numRecordsToKeep);
+      startingOperation = startingUpdates.getLatestOperation();
+
       // populate recent deletes list (since we can't get that info from the index)
-      for (int i=startingRecentUpdates.deleteList.size()-1; i>=0; i--) {
-        DeleteUpdate du = startingRecentUpdates.deleteList.get(i);
+      for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
+        DeleteUpdate du = startingUpdates.deleteList.get(i);
         oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
       }
     } finally {
-      startingRecentUpdates.close();
+      startingUpdates.close();
     }
 
   }
@@ -210,6 +219,10 @@ public class UpdateLog implements Plugin
     return startingVersions;
   }
 
+  public int getStartingOperation() {
+    return startingOperation;
+  }
+
   /* Takes over ownership of the log, keeping it until no longer needed
      and then decrementing it's reference and dropping it.
    */
@@ -275,7 +288,7 @@ public class UpdateLog implements Plugin
       // don't log if we are replaying from another log
       if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
         ensureLog();
-        pos = tlog.write(cmd);
+        pos = tlog.write(cmd, operationFlags);
       }
 
       // TODO: in the future we could support a real position for a REPLAY update.
@@ -302,7 +315,7 @@ public class UpdateLog implements Plugin
       // don't log if we are replaying from another log
       if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
         ensureLog();
-        pos = tlog.writeDelete(cmd);
+        pos = tlog.writeDelete(cmd, operationFlags);
       }
 
       LogPtr ptr = new LogPtr(pos, cmd.version);
@@ -326,7 +339,7 @@ public class UpdateLog implements Plugin
       // don't log if we are replaying from another log
       if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
         ensureLog();
-        pos = tlog.writeDeleteByQuery(cmd);
+        pos = tlog.writeDeleteByQuery(cmd, operationFlags);
       }
 
       // only change our caches if we are not buffering
@@ -417,7 +430,7 @@ public class UpdateLog implements Plugin
       if (prevTlog != null) {
         // if we made it through the commit, write a commit command to the log
         // TODO: check that this works to cap a tlog we were using to buffer so we don't
replay on startup.
-        prevTlog.writeCommit(cmd);
+        prevTlog.writeCommit(cmd, operationFlags);
 
         addOldLog(prevTlog, true);
         // the old log list will decref when no longer needed
@@ -630,7 +643,7 @@ public class UpdateLog implements Plugin
         // record a commit
         log.info("Recording current closed for " + uhandler.core + " log=" + theLog);
         CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core,
new ModifiableSolrParams((SolrParams)null)), false);
-        theLog.writeCommit(cmd);
+        theLog.writeCommit(cmd, operationFlags);
       }
 
       theLog.deleteOnClose = false;
@@ -685,7 +698,7 @@ public class UpdateLog implements Plugin
     HashMap<Long, Update> updates;
     List<Update> deleteByQueryList;
     List<DeleteUpdate> deleteList;
-
+    int latestOperation;
 
     public List<Long> getVersions(int n) {
       List<Long> ret = new ArrayList(n);
@@ -719,6 +732,10 @@ public class UpdateLog implements Plugin
       return result;
     }
 
+    public int getLatestOperation() {
+      return latestOperation;
+    }
+
 
     private void update() {
       int numUpdates = 0;
@@ -743,7 +760,11 @@ public class UpdateLog implements Plugin
               List entry = (List)o;
 
               // TODO: refactor this out so we get common error handling
-              int oper = (Integer)entry.get(0);
+              int opAndFlags = (Integer)entry.get(0);
+              if (latestOperation == 0) {
+                latestOperation = opAndFlags;
+              }
+              int oper = opAndFlags & UpdateLog.OPERATION_MASK;
               long version = (Long) entry.get(1);
 
               switch (oper) {
@@ -849,6 +870,9 @@ public class UpdateLog implements Plugin
       }
 
       state = State.BUFFERING;
+
+      // currently, buffering is only called by recovery, meaning that there is most likely
a gap in updates
+      operationFlags |= FLAG_GAP;
     } finally {
       versionInfo.unblockUpdates();
     }
@@ -872,6 +896,7 @@ public class UpdateLog implements Plugin
       }
 
       state = State.ACTIVE;
+      operationFlags &= ~FLAG_GAP;
     } catch (IOException e) {
       SolrException.log(log,"Error attempting to roll back log", e);
       return false;
@@ -904,6 +929,7 @@ public class UpdateLog implements Plugin
       }
       tlog.incref();
       state = State.APPLYING_BUFFERED;
+      operationFlags &= ~FLAG_GAP;
     } finally {
       versionInfo.unblockUpdates();
     }
@@ -1002,6 +1028,7 @@ public class UpdateLog implements Plugin
         UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req,
rsp, null));
 
         long commitVersion = 0;
+        int operationAndFlags = 0;
 
         for(;;) {
           Object o = null;
@@ -1046,7 +1073,8 @@ public class UpdateLog implements Plugin
             // should currently be a List<Oper,Ver,Doc/Id>
             List entry = (List)o;
 
-            int oper = (Integer)entry.get(0);
+            operationAndFlags = (Integer)entry.get(0);
+            int oper = operationAndFlags & OPERATION_MASK;
             long version = (Long) entry.get(1);
 
             switch (oper) {
@@ -1136,7 +1164,10 @@ public class UpdateLog implements Plugin
         if (!activeLog) {
           // if we are replaying an old tlog file, we need to add a commit to the end
           // so we don't replay it again if we restart right after.
-          translog.writeCommit(cmd);
+
+          // if the last operation we replayed had FLAG_GAP set, we want to use that again
so we don't lose it
+          // as the flag on the last operation.
+          translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK));
         }
 
         try {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1341283&r1=1341282&r2=1341283&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java Tue May 22
00:36:11 2012
@@ -468,6 +468,93 @@ public class TestRecovery extends SolrTe
   }
 
 
+  @Test
+  public void testBufferingFlags() throws Exception {
+
+    DirectUpdateHandler2.commitOnClose = false;
+    final Semaphore logReplayFinish = new Semaphore(0);
+
+    UpdateLog.testing_logReplayFinishHook = new Runnable() {
+      @Override
+      public void run() {
+        logReplayFinish.release();
+      }
+    };
+
+
+    SolrQueryRequest req = req();
+    UpdateHandler uhandler = req.getCore().getUpdateHandler();
+    UpdateLog ulog = uhandler.getUpdateLog();
+
+    try {
+      clearIndex();
+      assertU(commit());
+
+      assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+      ulog.bufferUpdates();
+
+      // simulate updates from a leader
+      updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
+      req.close();
+      h.close();
+      createCore();
+
+      req = req();
+      uhandler = req.getCore().getUpdateHandler();
+      ulog = uhandler.getUpdateLog();
+
+      logReplayFinish.acquire();  // wait for replay to finish
+
+      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);   // since
we died while buffering, we should see this last
+
+      //
+      // Try again to ensure that the previous log replay didn't wipe out our flags
+      //
+
+      req.close();
+      h.close();
+      createCore();
+
+      req = req();
+      uhandler = req.getCore().getUpdateHandler();
+      ulog = uhandler.getUpdateLog();
+
+      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
+
+      // now do some normal non-buffered adds
+      updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+      assertU(commit());
+
+      req.close();
+      h.close();
+      createCore();
+
+      req = req();
+      uhandler = req.getCore().getUpdateHandler();
+      ulog = uhandler.getUpdateLog();
+
+      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0);
+
+
+      assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in
a good state
+    } finally {
+      DirectUpdateHandler2.commitOnClose = true;
+      UpdateLog.testing_logReplayHook = null;
+      UpdateLog.testing_logReplayFinishHook = null;
+
+      req().close();
+    }
+
+  }
+
+
+
   // make sure that on a restart, versions don't start too low
   @Test
   public void testVersionsOnRestart() throws Exception {



Mime
View raw message