lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] lucene-solr:branch_7x: SOLR-9922: Write buffering updates to another tlog
Date Mon, 04 Jun 2018 04:33:32 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 88400a147 -> 2a6f4862f


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2a6f4862/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 7f821ea..1bda23f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -96,6 +96,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
   private static final long STATUS_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
   public static String LOG_FILENAME_PATTERN = "%s.%019d";
   public static String TLOG_NAME="tlog";
+  public static String BUFFER_TLOG_NAME="buffer.tlog";
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private boolean debug = log.isDebugEnabled();
@@ -139,11 +140,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
   public static final int DELETE_BY_QUERY = 0x03;
   public static final int COMMIT = 0x04;
   public static final int UPDATE_INPLACE = 0x08;
-  // Flag indicating that this is a buffered operation, and that a gap exists before buffering
started.
-  // for example, if full index replication starts and we are buffering updates, then this
flag should
-  // be set to indicate that replaying the log would not bring us into sync (i.e. peersync
should
-  // fail if this flag is set on the last update in the tlog).
-  public static final int FLAG_GAP = 0x10;
+  // For backward-compatibility, we should delete this field in 9.0
   public static final int OPERATION_MASK = 0x0f;  // mask off flags to get the operation
 
   /**
@@ -186,8 +183,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
 
   long id = -1;
   protected State state = State.ACTIVE;
-  protected int operationFlags;  // flags to write in the transaction log with operations
(i.e. FLAG_GAP)
 
+  protected TransactionLog bufferTlog;
   protected TransactionLog tlog;
   protected TransactionLog prevTlog;
   protected final Deque<TransactionLog> logs = new LinkedList<>();  // list of
recent logs, newest first
@@ -206,6 +203,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
   protected int maxNumLogsToKeep;
   protected int numVersionBuckets; // This should only be used to initialize VersionInfo...
the actual number of buckets may be rounded up to a power of two.
   protected Long maxVersionFromIndex = null;
+  protected boolean existOldBufferLog = false;
 
   // keep track of deletes only... this is not updated on an add
   protected LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef,
LogPtr>(numDeletesToKeep) {
@@ -244,7 +242,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
   volatile UpdateHandler uhandler;    // a core reload can change this reference!
   protected volatile boolean cancelApplyBufferUpdate;
   List<Long> startingVersions;
-  int startingOperation;  // last operation in the logs on startup
 
   // metrics
   protected Gauge<Integer> bufferedOpsGauge;
@@ -378,6 +375,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
       log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles)
+ ", next id=" + id);
     }
 
+    String[] oldBufferTlog = getBufferLogList(tlogDir);
+    if (oldBufferTlog != null && oldBufferTlog.length != 0) {
+      existOldBufferLog = true;
+    }
     TransactionLog oldLog = null;
     for (String oldLogName : tlogFiles) {
       File f = new File(tlogDir, oldLogName);
@@ -408,7 +409,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     // TODO: these startingVersions assume that we successfully recover from all non-complete
tlogs.
     try (RecentUpdates startingUpdates = getRecentUpdates()) {
       startingVersions = startingUpdates.getVersions(numRecordsToKeep);
-      startingOperation = startingUpdates.getLatestOperation();
 
       // populate recent deletes list (since we can't get that info from the index)
       for (int i = startingUpdates.deleteList.size() - 1; i >= 0; i--) {
@@ -434,14 +434,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     this.metricManager = manager;
     this.registryName = registry;
     bufferedOpsGauge = () -> {
+      if (state == State.BUFFERING) {
+        if (bufferTlog == null) return  0;
+        // numRecords counts header as a record
+        return bufferTlog.numRecords() - 1;
+      }
       if (tlog == null) {
         return 0;
       } else if (state == State.APPLYING_BUFFERED) {
         // numRecords counts header as a record
         return tlog.numRecords() - 1 - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes
- recoveryInfo.errors;
-      } else if (state == State.BUFFERING) {
-        // numRecords counts header as a record
-        return tlog.numRecords() - 1;
       } else {
         return 0;
       }
@@ -472,8 +474,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     return startingVersions;
   }
 
-  public int getStartingOperation() {
-    return startingOperation;
+  public boolean existOldBufferLog() {
+    return existOldBufferLog;
   }
 
   /* Takes over ownership of the log, keeping it until no longer needed
@@ -509,6 +511,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     logs.addFirst(oldLog);
   }
 
+  public String[] getBufferLogList(File directory) {
+    final String prefix = BUFFER_TLOG_NAME+'.';
+    return directory.list((dir, name) -> name.startsWith(prefix));
+  }
+
+  /**
+   * Does update from old tlogs (not from buffer tlog)?
+   * If yes we must skip writing {@code cmd} to current tlog
+   */
+  private boolean updateFromOldTlogs(UpdateCommand cmd) {
+    return (cmd.getFlags() & UpdateCommand.REPLAY) != 0 && state == State.REPLAYING;
+  }
+
   public String[] getLogList(File directory) {
     final String prefix = TLOG_NAME+'.';
     String[] names = directory.list(new FilenameFilter() {
@@ -541,14 +556,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return;
 
     synchronized (this) {
-      long pos = -1;
+      if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) {
+        ensureBufferTlog();
+        bufferTlog.write(cmd);
+        return;
+      }
 
+      long pos = -1;
       long prevPointer = getPrevPointerForUpdate(cmd);
 
       // don't log if we are replaying from another log
-      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+      if (!updateFromOldTlogs(cmd)) {
         ensureLog();
-        pos = tlog.write(cmd, prevPointer, operationFlags);
+        pos = tlog.write(cmd, prevPointer);
       }
 
       if (!clearCaches) {
@@ -556,10 +576,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
         // Only currently would be useful for RTG while in recovery mode though.
         LogPtr ptr = new LogPtr(pos, cmd.getVersion(), prevPointer);
 
-        // only update our map if we're not buffering
-        if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
-          map.put(cmd.getIndexedId(), ptr);
-        }
+        map.put(cmd.getIndexedId(), ptr);
 
         if (trace) {
           log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr
+ " map=" + System.identityHashCode(map));
@@ -606,22 +623,21 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     BytesRef br = cmd.getIndexedId();
 
     synchronized (this) {
-      long pos = -1;
+      if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) {
+        ensureBufferTlog();
+        bufferTlog.writeDelete(cmd);
+        return;
+      }
 
-      // don't log if we are replaying from another log
-      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+      long pos = -1;
+      if (!updateFromOldTlogs(cmd)) {
         ensureLog();
-        pos = tlog.writeDelete(cmd, operationFlags);
+        pos = tlog.writeDelete(cmd);
       }
 
       LogPtr ptr = new LogPtr(pos, cmd.version);
-
-      // only update our map if we're not buffering
-      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
-        map.put(br, ptr);
-
-        oldDeletes.put(br, ptr);
-      }
+      map.put(br, ptr);
+      oldDeletes.put(br, ptr);
 
       if (trace) {
         log.trace("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map="
+ System.identityHashCode(map));
@@ -631,15 +647,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
 
   public void deleteByQuery(DeleteUpdateCommand cmd) {
     synchronized (this) {
+      if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) {
+        ensureBufferTlog();
+        bufferTlog.writeDeleteByQuery(cmd);
+        return;
+      }
+
       long pos = -1;
-      // don't log if we are replaying from another log
-      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+      if (!updateFromOldTlogs(cmd)) {
         ensureLog();
-        pos = tlog.writeDeleteByQuery(cmd, operationFlags);
+        pos = tlog.writeDeleteByQuery(cmd);
       }
 
-      // only change our caches if we are not buffering
-      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags()
& UpdateCommand.IGNORE_INDEXWRITER) == 0) {
+      // skip purge our caches in case of tlog replica
+      if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) {
         // given that we just did a delete-by-query, we don't know what documents were
         // affected and hence we must purge our caches.
         openRealtimeSearcher();
@@ -802,7 +823,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
       if (prevTlog != null) {
         // if we made it through the commit, write a commit command to the log
         // TODO: check that this works to cap a tlog we were using to buffer so we don't
replay on startup.
-        prevTlog.writeCommit(cmd, operationFlags);
+        prevTlog.writeCommit(cmd);
 
         addOldLog(prevTlog, true);
         // the old log list will decref when no longer needed
@@ -1152,9 +1173,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
   public void copyOverBufferingUpdates(CommitUpdateCommand cuc) {
     versionInfo.blockUpdates();
     try {
-      operationFlags &= ~FLAG_GAP;
-      state = State.ACTIVE;
-      copyAndSwitchToNewTlog(cuc);
+      synchronized (this) {
+        state = State.ACTIVE;
+        if (bufferTlog == null) {
+          return;
+        }
+        // by calling this, we won't switch to new tlog (compared to applyBufferedUpdates())
+        // if we switch to new tlog we can possible lose updates on the next fetch
+        copyOverOldUpdates(cuc.getVersion(), bufferTlog);
+        dropBufferTlog();
+      }
     } finally {
       versionInfo.unblockUpdates();
     }
@@ -1165,33 +1193,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
    * So any updates which hasn't made it to the index is preserved in the current tlog
    * @param cuc any updates that have version larger than the version of cuc will be copied
over
    */
-  public void copyOverOldUpdates(CommitUpdateCommand cuc) {
+  public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) {
     versionInfo.blockUpdates();
     try {
-      copyAndSwitchToNewTlog(cuc);
+      synchronized (this) {
+        if (tlog == null) {
+          return;
+        }
+        preCommit(cuc);
+        try {
+          copyOverOldUpdates(cuc.getVersion());
+        } finally {
+          postCommit(cuc);
+        }
+      }
     } finally {
       versionInfo.unblockUpdates();
     }
   }
 
-  protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) {
-    synchronized (this) {
-      if (tlog == null) {
-        return;
-      }
-      preCommit(cuc);
-      try {
-        copyOverOldUpdates(cuc.getVersion());
-      } finally {
-        postCommit(cuc);
-      }
-    }
-  }
-
-  /**
-   * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
-   * @param commitVersion any updates that have version larger than the commitVersion will
be copied over
-   */
   public void copyOverOldUpdates(long commitVersion) {
     TransactionLog oldTlog = prevTlog;
     if (oldTlog == null && !logs.isEmpty()) {
@@ -1207,6 +1227,14 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
       log.warn("Exception reading log", e);
       return;
     }
+    copyOverOldUpdates(commitVersion, oldTlog);
+  }
+
+  /**
+   * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
+   * @param commitVersion any updates that have version larger than the commitVersion will
be copied over
+   */
+  public void copyOverOldUpdates(long commitVersion, TransactionLog oldTlog) {
     copyOverOldUpdatesMeter.mark();
 
     SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core,
@@ -1270,6 +1298,22 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     }
   }
 
+  protected void ensureBufferTlog() {
+    if (bufferTlog != null) return;
+    String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME,
System.nanoTime());
+    bufferTlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false);
+  }
+
+  // Cleanup old buffer tlogs
+  protected void deleteBufferLogs() {
+    String[] oldBufferTlog = getBufferLogList(tlogDir);
+    if (oldBufferTlog != null && oldBufferTlog.length != 0) {
+      for (String oldBufferLogName : oldBufferTlog) {
+        deleteFile(new File(tlogDir, oldBufferLogName));
+      }
+    }
+  }
+
 
   protected void ensureLog() {
     if (tlog == null) {
@@ -1285,7 +1329,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
         // record a commit
         log.info("Recording current closed for " + uhandler.core + " log=" + theLog);
         CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core,
new ModifiableSolrParams((SolrParams)null)), false);
-        theLog.writeCommit(cmd, operationFlags);
+        theLog.writeCommit(cmd);
       }
 
       theLog.deleteOnClose = false;
@@ -1314,6 +1358,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
         log.forceClose();
       }
 
+      if (bufferTlog != null) {
+        // should not delete bufferTlog on close, existing bufferTlog is a sign for skip
peerSync
+        bufferTlog.deleteOnClose = false;
+        bufferTlog.decref();
+        bufferTlog.forceClose();
+      }
+
       try {
         ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
       } catch (Exception e) {
@@ -1347,7 +1398,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     HashMap<Long, Update> updates;
     List<Update> deleteByQueryList;
     List<DeleteUpdate> deleteList;
-    int latestOperation;
 
     public RecentUpdates(Deque<TransactionLog> logList) {
       this.logList = logList;
@@ -1401,11 +1451,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
       return result;
     }
 
-    public int getLatestOperation() {
-      return latestOperation;
-    }
-
-
     private void update() {
       int numUpdates = 0;
       updateList = new ArrayList<>(logList.size());
@@ -1431,9 +1476,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
 
               // TODO: refactor this out so we get common error handling
               int opAndFlags = (Integer)entry.get(UpdateLog.FLAGS_IDX);
-              if (latestOperation == 0) {
-                latestOperation = opAndFlags;
-              }
               int oper = opAndFlags & UpdateLog.OPERATION_MASK;
               long version = (Long) entry.get(UpdateLog.VERSION_IDX);
 
@@ -1525,6 +1567,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
         tlog.incref();
         logList.addFirst(tlog);
       }
+      if (bufferTlog != null) {
+        bufferTlog.incref();
+        logList.addFirst(bufferTlog);
+      }
     }
 
     // TODO: what if I hand out a list of updates, then do an update, then hand out another
list (and
@@ -1542,13 +1588,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     // reading state and acting on it in the distributed update processor
     versionInfo.blockUpdates();
     try {
-      if (state == State.BUFFERING) {
-        log.info("Restarting buffering. previous=" + recoveryInfo);
-      } else if (state != State.ACTIVE) {
+      if (state != State.ACTIVE && state != State.BUFFERING) {
         // we don't currently have support for handling other states
         log.warn("Unexpected state for bufferUpdates: " + state + ", Ignoring request.");
         return;
       }
+      dropBufferTlog();
+      deleteBufferLogs();
 
       recoveryInfo = new RecoveryInfo();
 
@@ -1556,15 +1602,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
         log.info("Starting to buffer updates. " + this);
       }
 
-      // since we blocked updates, this synchronization shouldn't strictly be necessary.
-      synchronized (this) {
-        recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
-      }
-
       state = State.BUFFERING;
-
-      // currently, buffering is only called by recovery, meaning that there is most likely
a gap in updates
-      operationFlags |= FLAG_GAP;
     } finally {
       versionInfo.unblockUpdates();
     }
@@ -1580,25 +1618,24 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
         log.info("Dropping buffered updates " + this);
       }
 
-      // since we blocked updates, this synchronization shouldn't strictly be necessary.
-      synchronized (this) {
-        if (tlog != null) {
-          tlog.rollback(recoveryInfo.positionOfStart);
-        }
-      }
+      dropBufferTlog();
 
       state = State.ACTIVE;
-      operationFlags &= ~FLAG_GAP;
-    } catch (IOException e) {
-      SolrException.log(log,"Error attempting to roll back log", e);
-      return false;
-    }
-    finally {
+    } finally {
       versionInfo.unblockUpdates();
     }
     return true;
   }
 
+  private void dropBufferTlog() {
+    synchronized (this) {
+      if (bufferTlog != null) {
+        bufferTlog.decref();
+        bufferTlog = null;
+      }
+    }
+  }
+
 
   /** Returns the Future to wait on, or null if no replay was needed */
   public Future<RecoveryInfo> applyBufferedUpdates() {
@@ -1612,27 +1649,30 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     try {
       cancelApplyBufferUpdate = false;
       if (state != State.BUFFERING) return null;
-      operationFlags &= ~FLAG_GAP;
 
-      // handle case when no log was even created because no updates
-      // were received.
-      if (tlog == null) {
-        state = State.ACTIVE;
-        return null;
+      synchronized (this) {
+        // handle case when no updates were received.
+        if (bufferTlog == null) {
+          state = State.ACTIVE;
+          return null;
+        }
+        bufferTlog.incref();
       }
-      tlog.incref();
+
       state = State.APPLYING_BUFFERED;
     } finally {
       versionInfo.unblockUpdates();
     }
 
     if (recoveryExecutor.isShutdown()) {
-      tlog.decref();
       throw new RuntimeException("executor is not running...");
     }
     ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
-    LogReplayer replayer = new LogReplayer(Arrays.asList(new TransactionLog[]{tlog}), true);
-    return cs.submit(replayer, recoveryInfo);
+    LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true);
+    return cs.submit(() -> {
+      replayer.run();
+      dropBufferTlog();
+    }, recoveryInfo);
   }
 
   public State getState() {
@@ -1903,10 +1943,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
         if (!activeLog) {
           // if we are replaying an old tlog file, we need to add a commit to the end
           // so we don't replay it again if we restart right after.
-
-          // if the last operation we replayed had FLAG_GAP set, we want to use that again
so we don't lose it
-          // as the flag on the last operation.
-          translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK));
+          translog.writeCommit(cmd);
         }
 
         try {
@@ -2037,10 +2074,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer
{
     return cmd;
   }
   
-  public void cancelApplyBufferedUpdates() {
-    this.cancelApplyBufferUpdate = true;
-  }
-
   ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0,
       Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
       new DefaultSolrThreadFactory("recoveryExecutor"));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2a6f4862/solr/core/src/test/org/apache/solr/search/TestRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index 1d62207..1b79cee 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -24,7 +24,9 @@ import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.util.TimeOut;
 import org.noggit.ObjectBuilder;
 
 import org.slf4j.Logger;
@@ -820,6 +822,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
           +"]"
       );
 
+      // Note that the v101->v103 are dropped, therefore it does not present in RTG
       assertJQ(req("qt","/get", "getVersions","6")
           ,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}"
       );
@@ -929,7 +932,6 @@ public class TestRecovery extends SolrTestCaseJ4 {
           ,"=={'versions':["+v105+","+v104+"]}"
       );
 
-      // this time add some docs first before buffering starts (so tlog won't be at pos 0)
       updateJ(jsonAdd(sdoc("id","c100", "_version_",v200)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
       updateJ(jsonAdd(sdoc("id","c101", "_version_",v201)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
 
@@ -957,10 +959,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
 +""              +"]"
       );
 
-      // The updates that were buffered (but never applied) still appear in recent versions!
-      // This is good for some uses, but may not be good for others.
-      assertJQ(req("qt","/get", "getVersions","11")
-          ,"=={'versions':["+String.join(",",v206,v205,v204,v203,v201,v200,v105,v104,v103,v102,v101)+"]}"
+      assertJQ(req("qt","/get", "getVersions","6")
+          ,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}"
       );
 
       assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in
a good state
@@ -1008,13 +1008,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
 
 
     @Test
-  public void testBufferingFlags() throws Exception {
+  public void testExistOldBufferLog() throws Exception {
 
     DirectUpdateHandler2.commitOnClose = false;
-    final Semaphore logReplayFinish = new Semaphore(0);
-
-      UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release();
-
 
     SolrQueryRequest req = req();
     UpdateHandler uhandler = req.getCore().getUpdateHandler();
@@ -1024,9 +1020,6 @@ public class TestRecovery extends SolrTestCaseJ4 {
       String v101 = getNextVersion();
       String v102 = getNextVersion();
       String v103 = getNextVersion();
-      String v114 = getNextVersion();
-      String v115 = getNextVersion();
-      String v116 = getNextVersion();
       String v117 = getNextVersion();
       
       clearIndex();
@@ -1049,30 +1042,10 @@ public class TestRecovery extends SolrTestCaseJ4 {
       uhandler = req.getCore().getUpdateHandler();
       ulog = uhandler.getUpdateLog();
 
-      logReplayFinish.acquire();  // wait for replay to finish
-
-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);   // since
we died while buffering, we should see this last
-
-      //
-      // Try again to ensure that the previous log replay didn't wipe out our flags
-      //
-
-      req.close();
-      h.close();
-      createCore();
-
-      req = req();
-      uhandler = req.getCore().getUpdateHandler();
-      ulog = uhandler.getUpdateLog();
-
-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
-
-      // now do some normal non-buffered adds
-      updateJ(jsonAdd(sdoc("id","Q4", "_version_",v114)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
-      updateJ(jsonAdd(sdoc("id","Q5", "_version_",v115)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
-      updateJ(jsonAdd(sdoc("id","Q6", "_version_",v116)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
-      assertU(commit());
+      // the core does not replay updates from buffer tlog on startup
+      assertTrue(ulog.existOldBufferLog());   // since we died while buffering, we should
see this last
 
+      // buffer tlog won't be removed on restart
       req.close();
       h.close();
       createCore();
@@ -1081,10 +1054,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
       uhandler = req.getCore().getUpdateHandler();
       ulog = uhandler.getUpdateLog();
 
-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0);
+      assertTrue(ulog.existOldBufferLog());
 
       ulog.bufferUpdates();
-      // simulate receiving no updates
       ulog.applyBufferedUpdates();
       updateJ(jsonAdd(sdoc("id","Q7", "_version_",v117)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// do another add to make sure flags are back to normal
 
@@ -1096,10 +1068,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
       uhandler = req.getCore().getUpdateHandler();
       ulog = uhandler.getUpdateLog();
 
-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags
on Q7
-
-      logReplayFinish.acquire();
-      assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in
a good state
+      assertFalse(ulog.existOldBufferLog());
+      // Timeout for Q7 get replayed, because it was added on tlog, therefore it will be
replayed on restart
+      TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+      timeout.waitFor("Timeout waiting for finish replay updates",
+          () -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE);
+      assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7");
     } finally {
       DirectUpdateHandler2.commitOnClose = true;
       UpdateLog.testing_logReplayHook = null;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2a6f4862/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java b/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java
index e6bb9a6..1796319 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.cloud.hdfs.HdfsTestUtil;
 import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.update.DirectUpdateHandler2;
 import org.apache.solr.update.HdfsUpdateLog;
@@ -51,6 +52,7 @@ import org.apache.solr.update.UpdateHandler;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
 import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.apache.solr.util.TimeOut;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -515,13 +517,9 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
 
 
   @Test
-  public void testBufferingFlags() throws Exception {
+  public void testExistOldBufferLog() throws Exception {
 
     DirectUpdateHandler2.commitOnClose = false;
-    final Semaphore logReplayFinish = new Semaphore(0);
-
-    UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release();
-
 
     SolrQueryRequest req = req();
     UpdateHandler uhandler = req.getCore().getUpdateHandler();
@@ -548,14 +546,10 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
       uhandler = req.getCore().getUpdateHandler();
       ulog = uhandler.getUpdateLog();
 
-      logReplayFinish.acquire();  // wait for replay to finish
-
-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);   // since
we died while buffering, we should see this last
-
-      //
-      // Try again to ensure that the previous log replay didn't wipe out our flags
-      //
+      // the core no longer replay updates from buffer tlog on startup
+      assertTrue(ulog.existOldBufferLog());   // since we died while buffering, we should
see this last
 
+      // buffer tlog won't be removed on restart
       req.close();
       h.close();
       createCore();
@@ -564,23 +558,7 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
       uhandler = req.getCore().getUpdateHandler();
       ulog = uhandler.getUpdateLog();
 
-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
-
-      // now do some normal non-buffered adds
-      updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
-      updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
-      updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
-      assertU(commit());
-
-      req.close();
-      h.close();
-      createCore();
-
-      req = req();
-      uhandler = req.getCore().getUpdateHandler();
-      ulog = uhandler.getUpdateLog();
-
-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0);
+      assertTrue(ulog.existOldBufferLog());
 
       ulog.bufferUpdates();
       // simulate receiving no updates
@@ -595,10 +573,12 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
       uhandler = req.getCore().getUpdateHandler();
       ulog = uhandler.getUpdateLog();
 
-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags
on Q7
-
-      logReplayFinish.acquire();
-      assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in
a good state
+      assertFalse(ulog.existOldBufferLog());
+      // Timeout for Q7 get replayed, because it was added on tlog, therefore it will be
replayed on restart
+      TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+      timeout.waitFor("Timeout waiting for finish replay updates",
+          () -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE);
+      assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7");
     } finally {
       DirectUpdateHandler2.commitOnClose = true;
       UpdateLog.testing_logReplayHook = null;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2a6f4862/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java b/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java
index 1bf4ad4..d2b4b26 100644
--- a/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java
+++ b/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java
@@ -35,7 +35,7 @@ public class TransactionLogTest extends LuceneTestCase {
       transactionLog.lastAddSize = 2000000000;
       AddUpdateCommand updateCommand = new AddUpdateCommand(null);
       updateCommand.solrDoc = new SolrInputDocument();
-      transactionLog.write(updateCommand, 0);
+      transactionLog.write(updateCommand);
     }
   }
 


Mime
View raw message