lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [2/2] lucene-solr:master: SOLR-9922: Write buffering updates to another tlog
Date Mon, 04 Jun 2018 04:32:58 GMT
SOLR-9922: Write buffering updates to another tlog


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ab316bbc
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ab316bbc
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ab316bbc

Branch: refs/heads/master
Commit: ab316bbc91c273b13c851a38ad5d14ef64ab3eec
Parents: 3dc4fa1
Author: Cao Manh Dat <datcm@apache.org>
Authored: Mon Jun 4 11:32:31 2018 +0700
Committer: Cao Manh Dat <datcm@apache.org>
Committed: Mon Jun 4 11:32:31 2018 +0700

----------------------------------------------------------------------
 SOLR-9922.patch                                 | 1294 ++++++++++++++++++
 solr/CHANGES.txt                                |    2 +
 .../org/apache/solr/cloud/RecoveryStrategy.java |   29 +-
 .../apache/solr/cloud/ReplicateFromLeader.java  |    2 +-
 .../apache/solr/update/CdcrTransactionLog.java  |   20 +-
 .../org/apache/solr/update/CdcrUpdateLog.java   |    3 -
 .../apache/solr/update/HdfsTransactionLog.java  |   18 +-
 .../org/apache/solr/update/HdfsUpdateLog.java   |   84 +-
 .../org/apache/solr/update/TransactionLog.java  |   56 +-
 .../java/org/apache/solr/update/UpdateLog.java  |  255 ++--
 .../org/apache/solr/search/TestRecovery.java    |   58 +-
 .../apache/solr/search/TestRecoveryHdfs.java    |   46 +-
 .../apache/solr/update/TransactionLogTest.java  |    2 +-
 13 files changed, 1553 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ab316bbc/SOLR-9922.patch
----------------------------------------------------------------------
diff --git a/SOLR-9922.patch b/SOLR-9922.patch
new file mode 100644
index 0000000..052abf4
--- /dev/null
+++ b/SOLR-9922.patch
@@ -0,0 +1,1294 @@
+diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+index c8f5ae8..966497b 100644
+--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
++++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+@@ -449,7 +449,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
+ 
+   // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
+   final public void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
+-    boolean replayed = false;
+     boolean successfulRecovery = false;
+ 
+     UpdateLog ulog;
+@@ -500,8 +499,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
+       // when we went down.  We may have received updates since then.
+       recentVersions = startingVersions;
+       try {
+-        if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
+-          // last operation at the time of startup had the GAP flag set...
++        if (ulog.existOldBufferLog()) {
+           // this means we were previously doing a full index replication
+           // that probably didn't complete and buffering updates in the
+           // meantime.
+@@ -542,9 +540,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
+         }
+ 
+         LOG.info("Begin buffering updates. core=[{}]", coreName);
++        // recalling buffer updates will drop the old buffer tlog
+         ulog.bufferUpdates();
+-        replayed = false;
+-        
++
+         LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
+             ourUrl);
+         zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
+@@ -603,8 +601,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
+             
+             LOG.info("Replaying updates buffered during PeerSync.");
+             replay(core);
+-            replayed = true;
+-            
++
+             // sync success
+             successfulRecovery = true;
+             return;
+@@ -630,8 +627,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
+           }
+ 
+           replayFuture = replay(core);
+-          replayed = true;
+-          
++
+           if (isClosed()) {
+             LOG.info("RecoveryStrategy has been closed");
+             break;
+@@ -650,21 +646,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
+       } catch (Exception e) {
+         SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
+       } finally {
+-        if (!replayed) {
+-          // dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates
+-          // being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date.
+-          // For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will
+-          // reset our starting point for playback.
+-          LOG.info("Replay not started, or was not successful... still buffering updates.");
+-
+-          /** this prev code is retained in case we want to switch strategies.
+-          try {
+-            ulog.dropBufferedUpdates();
+-          } catch (Exception e) {
+-            SolrException.log(log, "", e);
+-          }
+-          **/
+-        }
+         if (successfulRecovery) {
+           LOG.info("Registering as Active after recovery.");
+           try {
+diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+index 0a742e3..aa648dd 100644
+--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
++++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+@@ -97,7 +97,7 @@ public class ReplicateFromLeader {
+                 new ModifiableSolrParams());
+             CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+             cuc.setVersion(Long.parseLong(commitVersion));
+-            updateLog.copyOverOldUpdates(cuc);
++            updateLog.commitAndSwitchToNewTlog(cuc);
+             lastVersion = Long.parseLong(commitVersion);
+           }
+         });
+diff --git a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
+index 3534f62..f668540 100644
+--- a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
++++ b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
+@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
+  * methods {@link #incref()}, {@link #close()} and {@link #reopenOutputStream()}.</li>
+  * <li>encode the number of records in the tlog file in the last commit record. The number of records will be
+  * decoded and reuse if the tlog file is reopened. This is achieved by extending the constructor, and the
+- * methods {@link #writeCommit(CommitUpdateCommand, int)} and {@link #getReader(long)}.</li>
++ * methods {@link #writeCommit(CommitUpdateCommand)} and {@link #getReader(long)}.</li>
+  * </ul>
+  */
+ public class CdcrTransactionLog extends TransactionLog {
+@@ -108,7 +108,7 @@ public class CdcrTransactionLog extends TransactionLog {
+   }
+ 
+   @Override
+-  public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
++  public long write(AddUpdateCommand cmd, long prevPointer) {
+     assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
+ 
+     LogCodec codec = new LogCodec(resolver);
+@@ -125,7 +125,7 @@ public class CdcrTransactionLog extends TransactionLog {
+       codec.init(out);
+       if (cmd.isInPlaceUpdate()) {
+         codec.writeTag(JavaBinCodec.ARR, 6);
+-        codec.writeInt(UpdateLog.UPDATE_INPLACE | flags);  // should just take one byte
++        codec.writeInt(UpdateLog.UPDATE_INPLACE);  // should just take one byte
+         codec.writeLong(cmd.getVersion());
+         codec.writeLong(prevPointer);
+         codec.writeLong(cmd.prevVersion);
+@@ -141,7 +141,7 @@ public class CdcrTransactionLog extends TransactionLog {
+ 
+       } else {
+         codec.writeTag(JavaBinCodec.ARR, 4);
+-        codec.writeInt(UpdateLog.ADD | flags);  // should just take one byte
++        codec.writeInt(UpdateLog.ADD);  // should just take one byte
+         codec.writeLong(cmd.getVersion());
+         if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
+           // if the update is received via cdcr source; add extra boolean entry
+@@ -179,7 +179,7 @@ public class CdcrTransactionLog extends TransactionLog {
+   }
+ 
+   @Override
+-  public long writeDelete(DeleteUpdateCommand cmd, int flags) {
++  public long writeDelete(DeleteUpdateCommand cmd) {
+     LogCodec codec = new LogCodec(resolver);
+ 
+     try {
+@@ -190,7 +190,7 @@ public class CdcrTransactionLog extends TransactionLog {
+       MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
+       codec.init(out);
+       codec.writeTag(JavaBinCodec.ARR, 4);
+-      codec.writeInt(UpdateLog.DELETE | flags);  // should just take one byte
++      codec.writeInt(UpdateLog.DELETE);  // should just take one byte
+       codec.writeLong(cmd.getVersion());
+       codec.writeByteArray(br.bytes, br.offset, br.length);
+       if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
+@@ -217,7 +217,7 @@ public class CdcrTransactionLog extends TransactionLog {
+   }
+ 
+   @Override
+-  public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
++  public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
+     LogCodec codec = new LogCodec(resolver);
+     try {
+       checkWriteHeader(codec, null);
+@@ -225,7 +225,7 @@ public class CdcrTransactionLog extends TransactionLog {
+       MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
+       codec.init(out);
+       codec.writeTag(JavaBinCodec.ARR, 4);
+-      codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags);  // should just take one byte
++      codec.writeInt(UpdateLog.DELETE_BY_QUERY);  // should just take one byte
+       codec.writeLong(cmd.getVersion());
+       codec.writeStr(cmd.query);
+       if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
+@@ -249,7 +249,7 @@ public class CdcrTransactionLog extends TransactionLog {
+   }
+ 
+   @Override
+-  public long writeCommit(CommitUpdateCommand cmd, int flags) {
++  public long writeCommit(CommitUpdateCommand cmd) {
+     LogCodec codec = new LogCodec(resolver);
+     synchronized (this) {
+       try {
+@@ -261,7 +261,7 @@ public class CdcrTransactionLog extends TransactionLog {
+         }
+         codec.init(fos);
+         codec.writeTag(JavaBinCodec.ARR, 4);
+-        codec.writeInt(UpdateLog.COMMIT | flags);  // should just take one byte
++        codec.writeInt(UpdateLog.COMMIT);  // should just take one byte
+         codec.writeLong(cmd.getVersion());
+         codec.writeTag(JavaBinCodec.INT); // Enforce the encoding of a plain integer, to simplify decoding
+         fos.writeInt(numRecords + 1); // the number of records in the file - +1 to account for the commit operation being written
+diff --git a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
+index 6b20204..bff1612 100644
+--- a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
++++ b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
+@@ -352,7 +352,6 @@ public class CdcrUpdateLog extends UpdateLog {
+     long latestVersion = startingUpdates.getMaxRecentVersion();
+     try {
+       startingVersions = startingUpdates.getVersions(numRecordsToKeep);
+-      startingOperation = startingUpdates.getLatestOperation();
+ 
+       // populate recent deletes list (since we can't get that info from the index)
+       for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
+@@ -389,9 +388,7 @@ public class CdcrUpdateLog extends UpdateLog {
+    */
+   private void copyBufferedUpdates(File tlogSrc, long offsetSrc, long latestVersion) {
+     recoveryInfo = new RecoveryInfo();
+-    recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
+     state = State.BUFFERING;
+-    operationFlags |= FLAG_GAP;
+ 
+     ModifiableSolrParams params = new ModifiableSolrParams();
+     params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString());
+diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+index 0f89016..8ed7d7a 100644
+--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
++++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+@@ -166,20 +166,6 @@ public class HdfsTransactionLog extends TransactionLog {
+     }
+     return true;
+   }
+-  
+-  // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
+-  // This should only be used to roll back buffered updates, not actually applied updates.
+-  @Override
+-  public void rollback(long pos) throws IOException {
+-    synchronized (this) {
+-      assert snapshot_size == pos;
+-      ensureFlushed();
+-      // TODO: how do we rollback with hdfs?? We need HDFS-3107
+-      fos.setWritten(pos);
+-      assert fos.size() == pos;
+-      numRecords = snapshot_numRecords;
+-    }
+-  }
+ 
+   private void readHeader(FastInputStream fis) throws IOException {
+     // read existing header
+@@ -210,7 +196,7 @@ public class HdfsTransactionLog extends TransactionLog {
+   }
+ 
+   @Override
+-  public long writeCommit(CommitUpdateCommand cmd, int flags) {
++  public long writeCommit(CommitUpdateCommand cmd) {
+     LogCodec codec = new LogCodec(resolver);
+     synchronized (this) {
+       try {
+@@ -223,7 +209,7 @@ public class HdfsTransactionLog extends TransactionLog {
+         
+         codec.init(fos);
+         codec.writeTag(JavaBinCodec.ARR, 3);
+-        codec.writeInt(UpdateLog.COMMIT | flags);  // should just take one byte
++        codec.writeInt(UpdateLog.COMMIT);  // should just take one byte
+         codec.writeLong(cmd.getVersion());
+         codec.writeStr(END_MESSAGE);  // ensure these bytes are (almost) last in the file
+ 
+diff --git a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
+index 7bb74d0..8ca4b1c 100644
+--- a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
++++ b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
+@@ -65,37 +65,6 @@ public class HdfsUpdateLog extends UpdateLog {
+     this.confDir = confDir;
+   }
+   
+-  // HACK
+-  // while waiting for HDFS-3107, instead of quickly
+-  // dropping, we slowly apply
+-  // This is somewhat brittle, but current usage
+-  // allows for it
+-  @Override
+-  public boolean dropBufferedUpdates() {
+-    versionInfo.blockUpdates();
+-    try {
+-      if (state != State.BUFFERING) return false;
+-      
+-      if (log.isInfoEnabled()) {
+-        log.info("Dropping buffered updates " + this);
+-      }
+-      
+-      // since we blocked updates, this synchronization shouldn't strictly be
+-      // necessary.
+-      synchronized (this) {
+-        if (tlog != null) {
+-          // tlog.rollback(recoveryInfo.positionOfStart);
+-        }
+-      }
+-      
+-      state = State.ACTIVE;
+-      operationFlags &= ~FLAG_GAP;
+-    } finally {
+-      versionInfo.unblockUpdates();
+-    }
+-    return true;
+-  }
+-  
+   @Override
+   public void init(PluginInfo info) {
+     super.init(info);
+@@ -186,6 +155,11 @@ public class HdfsUpdateLog extends UpdateLog {
+         throw new RuntimeException("Problem creating directory: " + tlogDir, e);
+       }
+     }
++
++    String[] oldBufferTlog = getBufferLogList(fs, tlogDir);
++    if (oldBufferTlog != null && oldBufferTlog.length != 0) {
++      existOldBufferLog = true;
++    }
+     
+     tlogFiles = getLogList(fs, tlogDir);
+     id = getLastLogId() + 1; // add 1 since we will create a new log for the
+@@ -241,7 +215,6 @@ public class HdfsUpdateLog extends UpdateLog {
+     // non-complete tlogs.
+     try (RecentUpdates startingUpdates = getRecentUpdates()) {
+       startingVersions = startingUpdates.getVersions(getNumRecordsToKeep());
+-      startingOperation = startingUpdates.getLatestOperation();
+ 
+       // populate recent deletes list (since we can't get that info from the
+       // index)
+@@ -269,6 +242,23 @@ public class HdfsUpdateLog extends UpdateLog {
+   public String getLogDir() {
+     return tlogDir.toUri().toString();
+   }
++
++  public static String[] getBufferLogList(FileSystem fs, Path tlogDir) {
++    final String prefix = BUFFER_TLOG_NAME+'.';
++    assert fs != null;
++    FileStatus[] fileStatuses;
++    try {
++      fileStatuses = fs.listStatus(tlogDir, path -> path.getName().startsWith(prefix));
++    } catch (IOException e) {
++      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed on listing old buffer tlog", e);
++    }
++
++    String[] names = new String[fileStatuses.length];
++    for (int i = 0; i < fileStatuses.length; i++) {
++      names[i] = fileStatuses[i].getPath().getName();
++    }
++    return names;
++  }
+   
+   public static String[] getLogList(FileSystem fs, Path tlogDir) {
+     final String prefix = TLOG_NAME + '.';
+@@ -307,7 +297,35 @@ public class HdfsUpdateLog extends UpdateLog {
+       IOUtils.closeQuietly(fs);
+     }
+   }
+-  
++
++  @Override
++  protected void ensureBufferTlog() {
++    if (bufferTlog != null) return;
++    String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime());
++    bufferTlog = new HdfsTransactionLog(fs, new Path(tlogDir, newLogName),
++        globalStrings, tlogDfsReplication);
++  }
++
++  @Override
++  protected void deleteBufferLogs() {
++    // Delete old buffer logs
++    String[] oldBufferTlog = getBufferLogList(fs, tlogDir);
++    if (oldBufferTlog != null && oldBufferTlog.length != 0) {
++      for (String oldBufferLogName : oldBufferTlog) {
++        Path f = new Path(tlogDir, oldBufferLogName);
++        try {
++          boolean s = fs.delete(f, false);
++          if (!s) {
++            log.error("Could not remove old buffer tlog file:" + f);
++          }
++        } catch (IOException e) {
++          // No need to bubble up this exception, because it won't cause any problems on recovering
++          log.error("Could not remove old buffer tlog file:" + f, e);
++        }
++      }
++    }
++  }
++
+   @Override
+   protected void ensureLog() {
+     if (tlog == null) {
+diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+index 96a928c..2a23896 100644
+--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
++++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+@@ -85,9 +85,6 @@ public class TransactionLog implements Closeable {
+   Map<String,Integer> globalStringMap = new HashMap<>();
+   List<String> globalStringList = new ArrayList<>();
+ 
+-  long snapshot_size;
+-  int snapshot_numRecords;
+-
+   // write a BytesRef as a byte array
+   static final JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
+     @Override
+@@ -153,7 +150,7 @@ public class TransactionLog implements Closeable {
+ 
+       // Parse tlog id from the filename
+       String filename = tlogFile.getName();
+-      id = Long.parseLong(filename.substring(filename.indexOf('.') + 1, filename.indexOf('.') + 20));
++      id = Long.parseLong(filename.substring(filename.lastIndexOf('.')+1));
+ 
+       this.tlogFile = tlogFile;
+       raf = new RandomAccessFile(this.tlogFile, "rw");
+@@ -233,29 +230,6 @@ public class TransactionLog implements Closeable {
+     return true;
+   }
+ 
+-  /** takes a snapshot of the current position and number of records
+-   * for later possible rollback, and returns the position */
+-  public long snapshot() {
+-    synchronized (this) {
+-      snapshot_size = fos.size();
+-      snapshot_numRecords = numRecords;
+-      return snapshot_size;
+-    }
+-  }
+-
+-  // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
+-  // This should only be used to roll back buffered updates, not actually applied updates.
+-  public void rollback(long pos) throws IOException {
+-    synchronized (this) {
+-      assert snapshot_size == pos;
+-      fos.flush();
+-      raf.setLength(pos);
+-      fos.setWritten(pos);
+-      assert fos.size() == pos;
+-      numRecords = snapshot_numRecords;
+-    }
+-  }
+-
+   public long writeData(Object o) {
+     @SuppressWarnings("resource") final LogCodec codec = new LogCodec(resolver);
+     try {
+@@ -346,17 +320,16 @@ public class TransactionLog implements Closeable {
+ 
+   /**
+    * Writes an add update command to the transaction log. This is not applicable for
+-   * in-place updates; use {@link #write(AddUpdateCommand, long, int)}.
++   * in-place updates; use {@link #write(AddUpdateCommand, long)}.
+    * (The previous pointer (applicable for in-place updates) is set to -1 while writing
+    * the command to the transaction log.)
+    * @param cmd The add update command to be written
+-   * @param flags Options for writing the command to the transaction log
+    * @return Returns the position pointer of the written update command
+    * 
+-   * @see #write(AddUpdateCommand, long, int)
++   * @see #write(AddUpdateCommand, long)
+    */
+-  public long write(AddUpdateCommand cmd, int flags) {
+-    return write(cmd, -1, flags);
++  public long write(AddUpdateCommand cmd) {
++    return write(cmd, -1);
+   }
+ 
+   /**
+@@ -365,10 +338,9 @@ public class TransactionLog implements Closeable {
+    * @param cmd The add update command to be written
+    * @param prevPointer The pointer in the transaction log which this update depends 
+    * on (applicable for in-place updates)
+-   * @param flags Options for writing the command to the transaction log
+    * @return Returns the position pointer of the written update command
+    */
+-  public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
++  public long write(AddUpdateCommand cmd, long prevPointer) {
+     assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
+     
+     LogCodec codec = new LogCodec(resolver);
+@@ -386,14 +358,14 @@ public class TransactionLog implements Closeable {
+       codec.init(out);
+       if (cmd.isInPlaceUpdate()) {
+         codec.writeTag(JavaBinCodec.ARR, 5);
+-        codec.writeInt(UpdateLog.UPDATE_INPLACE | flags);  // should just take one byte
++        codec.writeInt(UpdateLog.UPDATE_INPLACE);  // should just take one byte
+         codec.writeLong(cmd.getVersion());
+         codec.writeLong(prevPointer);
+         codec.writeLong(cmd.prevVersion);
+         codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+       } else {
+         codec.writeTag(JavaBinCodec.ARR, 3);
+-        codec.writeInt(UpdateLog.ADD | flags);  // should just take one byte
++        codec.writeInt(UpdateLog.ADD);  // should just take one byte
+         codec.writeLong(cmd.getVersion());
+         codec.writeSolrInputDocument(cmd.getSolrInputDocument());
+       }
+@@ -422,7 +394,7 @@ public class TransactionLog implements Closeable {
+     }
+   }
+ 
+-  public long writeDelete(DeleteUpdateCommand cmd, int flags) {
++  public long writeDelete(DeleteUpdateCommand cmd) {
+     LogCodec codec = new LogCodec(resolver);
+ 
+     try {
+@@ -433,7 +405,7 @@ public class TransactionLog implements Closeable {
+       MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
+       codec.init(out);
+       codec.writeTag(JavaBinCodec.ARR, 3);
+-      codec.writeInt(UpdateLog.DELETE | flags);  // should just take one byte
++      codec.writeInt(UpdateLog.DELETE);  // should just take one byte
+       codec.writeLong(cmd.getVersion());
+       codec.writeByteArray(br.bytes, br.offset, br.length);
+ 
+@@ -452,7 +424,7 @@ public class TransactionLog implements Closeable {
+ 
+   }
+ 
+-  public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
++  public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
+     LogCodec codec = new LogCodec(resolver);
+     try {
+       checkWriteHeader(codec, null);
+@@ -460,7 +432,7 @@ public class TransactionLog implements Closeable {
+       MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
+       codec.init(out);
+       codec.writeTag(JavaBinCodec.ARR, 3);
+-      codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags);  // should just take one byte
++      codec.writeInt(UpdateLog.DELETE_BY_QUERY);  // should just take one byte
+       codec.writeLong(cmd.getVersion());
+       codec.writeStr(cmd.query);
+ 
+@@ -478,7 +450,7 @@ public class TransactionLog implements Closeable {
+   }
+ 
+ 
+-  public long writeCommit(CommitUpdateCommand cmd, int flags) {
++  public long writeCommit(CommitUpdateCommand cmd) {
+     LogCodec codec = new LogCodec(resolver);
+     synchronized (this) {
+       try {
+@@ -490,7 +462,7 @@ public class TransactionLog implements Closeable {
+         }
+         codec.init(fos);
+         codec.writeTag(JavaBinCodec.ARR, 3);
+-        codec.writeInt(UpdateLog.COMMIT | flags);  // should just take one byte
++        codec.writeInt(UpdateLog.COMMIT);  // should just take one byte
+         codec.writeLong(cmd.getVersion());
+         codec.writeStr(END_MESSAGE);  // ensure these bytes are (almost) last in the file
+ 
+diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+index 7f821ea..1bda23f 100644
+--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
++++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+@@ -96,6 +96,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+   private static final long STATUS_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
+   public static String LOG_FILENAME_PATTERN = "%s.%019d";
+   public static String TLOG_NAME="tlog";
++  public static String BUFFER_TLOG_NAME="buffer.tlog";
+ 
+   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   private boolean debug = log.isDebugEnabled();
+@@ -139,11 +140,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+   public static final int DELETE_BY_QUERY = 0x03;
+   public static final int COMMIT = 0x04;
+   public static final int UPDATE_INPLACE = 0x08;
+-  // Flag indicating that this is a buffered operation, and that a gap exists before buffering started.
+-  // for example, if full index replication starts and we are buffering updates, then this flag should
+-  // be set to indicate that replaying the log would not bring us into sync (i.e. peersync should
+-  // fail if this flag is set on the last update in the tlog).
+-  public static final int FLAG_GAP = 0x10;
++  // For backward-compatibility, we should delete this field in 9.0
+   public static final int OPERATION_MASK = 0x0f;  // mask off flags to get the operation
+ 
+   /**
+@@ -186,8 +183,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+ 
+   long id = -1;
+   protected State state = State.ACTIVE;
+-  protected int operationFlags;  // flags to write in the transaction log with operations (i.e. FLAG_GAP)
+ 
++  protected TransactionLog bufferTlog;
+   protected TransactionLog tlog;
+   protected TransactionLog prevTlog;
+   protected final Deque<TransactionLog> logs = new LinkedList<>();  // list of recent logs, newest first
+@@ -206,6 +203,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+   protected int maxNumLogsToKeep;
+   protected int numVersionBuckets; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two.
+   protected Long maxVersionFromIndex = null;
++  protected boolean existOldBufferLog = false;
+ 
+   // keep track of deletes only... this is not updated on an add
+   protected LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
+@@ -244,7 +242,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+   volatile UpdateHandler uhandler;    // a core reload can change this reference!
+   protected volatile boolean cancelApplyBufferUpdate;
+   List<Long> startingVersions;
+-  int startingOperation;  // last operation in the logs on startup
+ 
+   // metrics
+   protected Gauge<Integer> bufferedOpsGauge;
+@@ -378,6 +375,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+       log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
+     }
+ 
++    String[] oldBufferTlog = getBufferLogList(tlogDir);
++    if (oldBufferTlog != null && oldBufferTlog.length != 0) {
++      existOldBufferLog = true;
++    }
+     TransactionLog oldLog = null;
+     for (String oldLogName : tlogFiles) {
+       File f = new File(tlogDir, oldLogName);
+@@ -408,7 +409,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
+     try (RecentUpdates startingUpdates = getRecentUpdates()) {
+       startingVersions = startingUpdates.getVersions(numRecordsToKeep);
+-      startingOperation = startingUpdates.getLatestOperation();
+ 
+       // populate recent deletes list (since we can't get that info from the index)
+       for (int i = startingUpdates.deleteList.size() - 1; i >= 0; i--) {
+@@ -434,14 +434,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     this.metricManager = manager;
+     this.registryName = registry;
+     bufferedOpsGauge = () -> {
++      if (state == State.BUFFERING) {
++        if (bufferTlog == null) return  0;
++        // numRecords counts header as a record
++        return bufferTlog.numRecords() - 1;
++      }
+       if (tlog == null) {
+         return 0;
+       } else if (state == State.APPLYING_BUFFERED) {
+         // numRecords counts header as a record
+         return tlog.numRecords() - 1 - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes - recoveryInfo.errors;
+-      } else if (state == State.BUFFERING) {
+-        // numRecords counts header as a record
+-        return tlog.numRecords() - 1;
+       } else {
+         return 0;
+       }
+@@ -472,8 +474,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     return startingVersions;
+   }
+ 
+-  public int getStartingOperation() {
+-    return startingOperation;
++  public boolean existOldBufferLog() {
++    return existOldBufferLog;
+   }
+ 
+   /* Takes over ownership of the log, keeping it until no longer needed
+@@ -509,6 +511,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     logs.addFirst(oldLog);
+   }
+ 
++  public String[] getBufferLogList(File directory) {
++    final String prefix = BUFFER_TLOG_NAME+'.';
++    return directory.list((dir, name) -> name.startsWith(prefix));
++  }
++
++  /**
++   * Does update from old tlogs (not from buffer tlog)?
++   * If yes we must skip writing {@code cmd} to current tlog
++   */
++  private boolean updateFromOldTlogs(UpdateCommand cmd) {
++    return (cmd.getFlags() & UpdateCommand.REPLAY) != 0 && state == State.REPLAYING;
++  }
++
+   public String[] getLogList(File directory) {
+     final String prefix = TLOG_NAME+'.';
+     String[] names = directory.list(new FilenameFilter() {
+@@ -541,14 +556,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return;
+ 
+     synchronized (this) {
+-      long pos = -1;
++      if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) {
++        ensureBufferTlog();
++        bufferTlog.write(cmd);
++        return;
++      }
+ 
++      long pos = -1;
+       long prevPointer = getPrevPointerForUpdate(cmd);
+ 
+       // don't log if we are replaying from another log
+-      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
++      if (!updateFromOldTlogs(cmd)) {
+         ensureLog();
+-        pos = tlog.write(cmd, prevPointer, operationFlags);
++        pos = tlog.write(cmd, prevPointer);
+       }
+ 
+       if (!clearCaches) {
+@@ -556,10 +576,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+         // Only currently would be useful for RTG while in recovery mode though.
+         LogPtr ptr = new LogPtr(pos, cmd.getVersion(), prevPointer);
+ 
+-        // only update our map if we're not buffering
+-        if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+-          map.put(cmd.getIndexedId(), ptr);
+-        }
++        map.put(cmd.getIndexedId(), ptr);
+ 
+         if (trace) {
+           log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+@@ -606,22 +623,21 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     BytesRef br = cmd.getIndexedId();
+ 
+     synchronized (this) {
+-      long pos = -1;
++      if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) {
++        ensureBufferTlog();
++        bufferTlog.writeDelete(cmd);
++        return;
++      }
+ 
+-      // don't log if we are replaying from another log
+-      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
++      long pos = -1;
++      if (!updateFromOldTlogs(cmd)) {
+         ensureLog();
+-        pos = tlog.writeDelete(cmd, operationFlags);
++        pos = tlog.writeDelete(cmd);
+       }
+ 
+       LogPtr ptr = new LogPtr(pos, cmd.version);
+-
+-      // only update our map if we're not buffering
+-      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+-        map.put(br, ptr);
+-
+-        oldDeletes.put(br, ptr);
+-      }
++      map.put(br, ptr);
++      oldDeletes.put(br, ptr);
+ 
+       if (trace) {
+         log.trace("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+@@ -631,15 +647,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+ 
+   public void deleteByQuery(DeleteUpdateCommand cmd) {
+     synchronized (this) {
++      if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) {
++        ensureBufferTlog();
++        bufferTlog.writeDeleteByQuery(cmd);
++        return;
++      }
++
+       long pos = -1;
+-      // don't log if we are replaying from another log
+-      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
++      if (!updateFromOldTlogs(cmd)) {
+         ensureLog();
+-        pos = tlog.writeDeleteByQuery(cmd, operationFlags);
++        pos = tlog.writeDeleteByQuery(cmd);
+       }
+ 
+-      // only change our caches if we are not buffering
+-      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) {
++      // skip purge our caches in case of tlog replica
++      if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) {
+         // given that we just did a delete-by-query, we don't know what documents were
+         // affected and hence we must purge our caches.
+         openRealtimeSearcher();
+@@ -802,7 +823,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+       if (prevTlog != null) {
+         // if we made it through the commit, write a commit command to the log
+         // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup.
+-        prevTlog.writeCommit(cmd, operationFlags);
++        prevTlog.writeCommit(cmd);
+ 
+         addOldLog(prevTlog, true);
+         // the old log list will decref when no longer needed
+@@ -1152,9 +1173,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+   public void copyOverBufferingUpdates(CommitUpdateCommand cuc) {
+     versionInfo.blockUpdates();
+     try {
+-      operationFlags &= ~FLAG_GAP;
+-      state = State.ACTIVE;
+-      copyAndSwitchToNewTlog(cuc);
++      synchronized (this) {
++        state = State.ACTIVE;
++        if (bufferTlog == null) {
++          return;
++        }
++        // by calling this, we won't switch to new tlog (compared to applyBufferedUpdates())
++        // if we switch to new tlog we can possible lose updates on the next fetch
++        copyOverOldUpdates(cuc.getVersion(), bufferTlog);
++        dropBufferTlog();
++      }
+     } finally {
+       versionInfo.unblockUpdates();
+     }
+@@ -1165,33 +1193,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+    * So any updates which hasn't made it to the index is preserved in the current tlog
+    * @param cuc any updates that have version larger than the version of cuc will be copied over
+    */
+-  public void copyOverOldUpdates(CommitUpdateCommand cuc) {
++  public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) {
+     versionInfo.blockUpdates();
+     try {
+-      copyAndSwitchToNewTlog(cuc);
++      synchronized (this) {
++        if (tlog == null) {
++          return;
++        }
++        preCommit(cuc);
++        try {
++          copyOverOldUpdates(cuc.getVersion());
++        } finally {
++          postCommit(cuc);
++        }
++      }
+     } finally {
+       versionInfo.unblockUpdates();
+     }
+   }
+ 
+-  protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) {
+-    synchronized (this) {
+-      if (tlog == null) {
+-        return;
+-      }
+-      preCommit(cuc);
+-      try {
+-        copyOverOldUpdates(cuc.getVersion());
+-      } finally {
+-        postCommit(cuc);
+-      }
+-    }
+-  }
+-
+-  /**
+-   * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
+-   * @param commitVersion any updates that have version larger than the commitVersion will be copied over
+-   */
+   public void copyOverOldUpdates(long commitVersion) {
+     TransactionLog oldTlog = prevTlog;
+     if (oldTlog == null && !logs.isEmpty()) {
+@@ -1207,6 +1227,14 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+       log.warn("Exception reading log", e);
+       return;
+     }
++    copyOverOldUpdates(commitVersion, oldTlog);
++  }
++
++  /**
++   * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
++   * @param commitVersion any updates that have version larger than the commitVersion will be copied over
++   */
++  public void copyOverOldUpdates(long commitVersion, TransactionLog oldTlog) {
+     copyOverOldUpdatesMeter.mark();
+ 
+     SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core,
+@@ -1270,6 +1298,22 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     }
+   }
+ 
++  protected void ensureBufferTlog() {
++    if (bufferTlog != null) return;
++    String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime());
++    bufferTlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false);
++  }
++
++  // Cleanup old buffer tlogs
++  protected void deleteBufferLogs() {
++    String[] oldBufferTlog = getBufferLogList(tlogDir);
++    if (oldBufferTlog != null && oldBufferTlog.length != 0) {
++      for (String oldBufferLogName : oldBufferTlog) {
++        deleteFile(new File(tlogDir, oldBufferLogName));
++      }
++    }
++  }
++
+ 
+   protected void ensureLog() {
+     if (tlog == null) {
+@@ -1285,7 +1329,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+         // record a commit
+         log.info("Recording current closed for " + uhandler.core + " log=" + theLog);
+         CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false);
+-        theLog.writeCommit(cmd, operationFlags);
++        theLog.writeCommit(cmd);
+       }
+ 
+       theLog.deleteOnClose = false;
+@@ -1314,6 +1358,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+         log.forceClose();
+       }
+ 
++      if (bufferTlog != null) {
++        // should not delete bufferTlog on close, existing bufferTlog is a sign for skip peerSync
++        bufferTlog.deleteOnClose = false;
++        bufferTlog.decref();
++        bufferTlog.forceClose();
++      }
++
+       try {
+         ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
+       } catch (Exception e) {
+@@ -1347,7 +1398,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     HashMap<Long, Update> updates;
+     List<Update> deleteByQueryList;
+     List<DeleteUpdate> deleteList;
+-    int latestOperation;
+ 
+     public RecentUpdates(Deque<TransactionLog> logList) {
+       this.logList = logList;
+@@ -1401,11 +1451,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+       return result;
+     }
+ 
+-    public int getLatestOperation() {
+-      return latestOperation;
+-    }
+-
+-
+     private void update() {
+       int numUpdates = 0;
+       updateList = new ArrayList<>(logList.size());
+@@ -1431,9 +1476,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+ 
+               // TODO: refactor this out so we get common error handling
+               int opAndFlags = (Integer)entry.get(UpdateLog.FLAGS_IDX);
+-              if (latestOperation == 0) {
+-                latestOperation = opAndFlags;
+-              }
+               int oper = opAndFlags & UpdateLog.OPERATION_MASK;
+               long version = (Long) entry.get(UpdateLog.VERSION_IDX);
+ 
+@@ -1525,6 +1567,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+         tlog.incref();
+         logList.addFirst(tlog);
+       }
++      if (bufferTlog != null) {
++        bufferTlog.incref();
++        logList.addFirst(bufferTlog);
++      }
+     }
+ 
+     // TODO: what if I hand out a list of updates, then do an update, then hand out another list (and
+@@ -1542,13 +1588,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     // reading state and acting on it in the distributed update processor
+     versionInfo.blockUpdates();
+     try {
+-      if (state == State.BUFFERING) {
+-        log.info("Restarting buffering. previous=" + recoveryInfo);
+-      } else if (state != State.ACTIVE) {
++      if (state != State.ACTIVE && state != State.BUFFERING) {
+         // we don't currently have support for handling other states
+         log.warn("Unexpected state for bufferUpdates: " + state + ", Ignoring request.");
+         return;
+       }
++      dropBufferTlog();
++      deleteBufferLogs();
+ 
+       recoveryInfo = new RecoveryInfo();
+ 
+@@ -1556,15 +1602,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+         log.info("Starting to buffer updates. " + this);
+       }
+ 
+-      // since we blocked updates, this synchronization shouldn't strictly be necessary.
+-      synchronized (this) {
+-        recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
+-      }
+-
+       state = State.BUFFERING;
+-
+-      // currently, buffering is only called by recovery, meaning that there is most likely a gap in updates
+-      operationFlags |= FLAG_GAP;
+     } finally {
+       versionInfo.unblockUpdates();
+     }
+@@ -1580,25 +1618,24 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+         log.info("Dropping buffered updates " + this);
+       }
+ 
+-      // since we blocked updates, this synchronization shouldn't strictly be necessary.
+-      synchronized (this) {
+-        if (tlog != null) {
+-          tlog.rollback(recoveryInfo.positionOfStart);
+-        }
+-      }
++      dropBufferTlog();
+ 
+       state = State.ACTIVE;
+-      operationFlags &= ~FLAG_GAP;
+-    } catch (IOException e) {
+-      SolrException.log(log,"Error attempting to roll back log", e);
+-      return false;
+-    }
+-    finally {
++    } finally {
+       versionInfo.unblockUpdates();
+     }
+     return true;
+   }
+ 
++  private void dropBufferTlog() {
++    synchronized (this) {
++      if (bufferTlog != null) {
++        bufferTlog.decref();
++        bufferTlog = null;
++      }
++    }
++  }
++
+ 
+   /** Returns the Future to wait on, or null if no replay was needed */
+   public Future<RecoveryInfo> applyBufferedUpdates() {
+@@ -1612,27 +1649,30 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     try {
+       cancelApplyBufferUpdate = false;
+       if (state != State.BUFFERING) return null;
+-      operationFlags &= ~FLAG_GAP;
+ 
+-      // handle case when no log was even created because no updates
+-      // were received.
+-      if (tlog == null) {
+-        state = State.ACTIVE;
+-        return null;
++      synchronized (this) {
++        // handle case when no updates were received.
++        if (bufferTlog == null) {
++          state = State.ACTIVE;
++          return null;
++        }
++        bufferTlog.incref();
+       }
+-      tlog.incref();
++
+       state = State.APPLYING_BUFFERED;
+     } finally {
+       versionInfo.unblockUpdates();
+     }
+ 
+     if (recoveryExecutor.isShutdown()) {
+-      tlog.decref();
+       throw new RuntimeException("executor is not running...");
+     }
+     ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+-    LogReplayer replayer = new LogReplayer(Arrays.asList(new TransactionLog[]{tlog}), true);
+-    return cs.submit(replayer, recoveryInfo);
++    LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true);
++    return cs.submit(() -> {
++      replayer.run();
++      dropBufferTlog();
++    }, recoveryInfo);
+   }
+ 
+   public State getState() {
+@@ -1903,10 +1943,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+         if (!activeLog) {
+           // if we are replaying an old tlog file, we need to add a commit to the end
+           // so we don't replay it again if we restart right after.
+-
+-          // if the last operation we replayed had FLAG_GAP set, we want to use that again so we don't lose it
+-          // as the flag on the last operation.
+-          translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK));
++          translog.writeCommit(cmd);
+         }
+ 
+         try {
+@@ -2037,10 +2074,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
+     return cmd;
+   }
+   
+-  public void cancelApplyBufferedUpdates() {
+-    this.cancelApplyBufferUpdate = true;
+-  }
+-
+   ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0,
+       Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+       new DefaultSolrThreadFactory("recoveryExecutor"));
+diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+index 1d62207..1b79cee 100644
+--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
++++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+@@ -24,7 +24,9 @@ import com.codahale.metrics.Gauge;
+ import com.codahale.metrics.Meter;
+ import com.codahale.metrics.Metric;
+ import com.codahale.metrics.MetricRegistry;
++import org.apache.solr.common.util.TimeSource;
+ import org.apache.solr.metrics.SolrMetricManager;
++import org.apache.solr.util.TimeOut;
+ import org.noggit.ObjectBuilder;
+ 
+ import org.slf4j.Logger;
+@@ -820,6 +822,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
+           +"]"
+       );
+ 
++      // Note that the v101->v103 are dropped, therefore it does not present in RTG
+       assertJQ(req("qt","/get", "getVersions","6")
+           ,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}"
+       );
+@@ -929,7 +932,6 @@ public class TestRecovery extends SolrTestCaseJ4 {
+           ,"=={'versions':["+v105+","+v104+"]}"
+       );
+ 
+-      // this time add some docs first before buffering starts (so tlog won't be at pos 0)
+       updateJ(jsonAdd(sdoc("id","c100", "_version_",v200)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+       updateJ(jsonAdd(sdoc("id","c101", "_version_",v201)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+ 
+@@ -957,10 +959,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
+ +""              +"]"
+       );
+ 
+-      // The updates that were buffered (but never applied) still appear in recent versions!
+-      // This is good for some uses, but may not be good for others.
+-      assertJQ(req("qt","/get", "getVersions","11")
+-          ,"=={'versions':["+String.join(",",v206,v205,v204,v203,v201,v200,v105,v104,v103,v102,v101)+"]}"
++      assertJQ(req("qt","/get", "getVersions","6")
++          ,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}"
+       );
+ 
+       assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
+@@ -1008,13 +1008,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
+ 
+ 
+     @Test
+-  public void testBufferingFlags() throws Exception {
++  public void testExistOldBufferLog() throws Exception {
+ 
+     DirectUpdateHandler2.commitOnClose = false;
+-    final Semaphore logReplayFinish = new Semaphore(0);
+-
+-      UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release();
+-
+ 
+     SolrQueryRequest req = req();
+     UpdateHandler uhandler = req.getCore().getUpdateHandler();
+@@ -1024,9 +1020,6 @@ public class TestRecovery extends SolrTestCaseJ4 {
+       String v101 = getNextVersion();
+       String v102 = getNextVersion();
+       String v103 = getNextVersion();
+-      String v114 = getNextVersion();
+-      String v115 = getNextVersion();
+-      String v116 = getNextVersion();
+       String v117 = getNextVersion();
+       
+       clearIndex();
+@@ -1049,30 +1042,10 @@ public class TestRecovery extends SolrTestCaseJ4 {
+       uhandler = req.getCore().getUpdateHandler();
+       ulog = uhandler.getUpdateLog();
+ 
+-      logReplayFinish.acquire();  // wait for replay to finish
+-
+-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);   // since we died while buffering, we should see this last
+-
+-      //
+-      // Try again to ensure that the previous log replay didn't wipe out our flags
+-      //
+-
+-      req.close();
+-      h.close();
+-      createCore();
+-
+-      req = req();
+-      uhandler = req.getCore().getUpdateHandler();
+-      ulog = uhandler.getUpdateLog();
+-
+-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
+-
+-      // now do some normal non-buffered adds
+-      updateJ(jsonAdd(sdoc("id","Q4", "_version_",v114)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+-      updateJ(jsonAdd(sdoc("id","Q5", "_version_",v115)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+-      updateJ(jsonAdd(sdoc("id","Q6", "_version_",v116)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+-      assertU(commit());
++      // the core does not replay updates from buffer tlog on startup
++      assertTrue(ulog.existOldBufferLog());   // since we died while buffering, we should see this last
+ 
++      // buffer tlog won't be removed on restart
+       req.close();
+       h.close();
+       createCore();
+@@ -1081,10 +1054,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
+       uhandler = req.getCore().getUpdateHandler();
+       ulog = uhandler.getUpdateLog();
+ 
+-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0);
++      assertTrue(ulog.existOldBufferLog());
+ 
+       ulog.bufferUpdates();
+-      // simulate receiving no updates
+       ulog.applyBufferedUpdates();
+       updateJ(jsonAdd(sdoc("id","Q7", "_version_",v117)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // do another add to make sure flags are back to normal
+ 
+@@ -1096,10 +1068,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
+       uhandler = req.getCore().getUpdateHandler();
+       ulog = uhandler.getUpdateLog();
+ 
+-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags on Q7
+-
+-      logReplayFinish.acquire();
+-      assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
++      assertFalse(ulog.existOldBufferLog());
++      // Timeout for Q7 get replayed, because it was added on tlog, therefore it will be replayed on restart
++      TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
++      timeout.waitFor("Timeout waiting for finish replay updates",
++          () -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE);
++      assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7");
+     } finally {
+       DirectUpdateHandler2.commitOnClose = true;
+       UpdateLog.testing_logReplayHook = null;
+diff --git a/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java b/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java
+index e6bb9a6..1796319 100644
+--- a/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java
++++ b/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java
+@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
+ import org.apache.solr.SolrTestCaseJ4;
+ import org.apache.solr.cloud.hdfs.HdfsTestUtil;
+ import org.apache.solr.common.util.IOUtils;
++import org.apache.solr.common.util.TimeSource;
+ import org.apache.solr.request.SolrQueryRequest;
+ import org.apache.solr.update.DirectUpdateHandler2;
+ import org.apache.solr.update.HdfsUpdateLog;
+@@ -51,6 +52,7 @@ import org.apache.solr.update.UpdateHandler;
+ import org.apache.solr.update.UpdateLog;
+ import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
+ import org.apache.solr.util.BadHdfsThreadsFilter;
++import org.apache.solr.util.TimeOut;
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.Ignore;
+@@ -515,13 +517,9 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
+ 
+ 
+   @Test
+-  public void testBufferingFlags() throws Exception {
++  public void testExistOldBufferLog() throws Exception {
+ 
+     DirectUpdateHandler2.commitOnClose = false;
+-    final Semaphore logReplayFinish = new Semaphore(0);
+-
+-    UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release();
+-
+ 
+     SolrQueryRequest req = req();
+     UpdateHandler uhandler = req.getCore().getUpdateHandler();
+@@ -548,14 +546,10 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
+       uhandler = req.getCore().getUpdateHandler();
+       ulog = uhandler.getUpdateLog();
+ 
+-      logReplayFinish.acquire();  // wait for replay to finish
+-
+-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);   // since we died while buffering, we should see this last
+-
+-      //
+-      // Try again to ensure that the previous log replay didn't wipe out our flags
+-      //
++      // the core no longer replay updates from buffer tlog on startup
++      assertTrue(ulog.existOldBufferLog());   // since we died while buffering, we should see this last
+ 
++      // buffer tlog won't be removed on restart
+       req.close();
+       h.close();
+       createCore();
+@@ -564,23 +558,7 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
+       uhandler = req.getCore().getUpdateHandler();
+       ulog = uhandler.getUpdateLog();
+ 
+-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
+-
+-      // now do some normal non-buffered adds
+-      updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+-      updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+-      updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+-      assertU(commit());
+-
+-      req.close();
+-      h.close();
+-      createCore();
+-
+-      req = req();
+-      uhandler = req.getCore().getUpdateHandler();
+-      ulog = uhandler.getUpdateLog();
+-
+-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0);
++      assertTrue(ulog.existOldBufferLog());
+ 
+       ulog.bufferUpdates();
+       // simulate receiving no updates
+@@ -595,10 +573,12 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
+       uhandler = req.getCore().getUpdateHandler();
+       ulog = uhandler.getUpdateLog();
+ 
+-      assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags on Q7
+-
+-      logReplayFinish.acquire();
+-      assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
++      assertFalse(ulog.existOldBufferLog());
++      // Timeout for Q7 get replayed, because it was added on tlog, therefore it will be replayed on restart
++      TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
++      timeout.waitFor("Timeout waiting for finish replay updates",
++          () -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE);
++      assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7");
+     } finally {
+       DirectUpdateHandler2.commitOnClose = true;
+       UpdateLog.testing_logReplayHook = null;
+diff --git a/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java b/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java
+index 1bf4ad4..d2b4b26 100644
+--- a/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java
++++ b/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java
+@@ -35,7 +35,7 @@ public class TransactionLogTest extends LuceneTestCase {
+       transactionLog.lastAddSize = 2000000000;
+       AddUpdateCommand updateCommand = new AddUpdateCommand(null);
+       updateCommand.solrDoc = new SolrInputDocument();
+-      transactionLog.write(updateCommand, 0);
++      transactionLog.write(updateCommand);
+     }
+   }
+ 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ab316bbc/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 66d8853..2c2191e 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -325,6 +325,8 @@ Optimizations
   SolrConstantScoreQuery as well.  QWF since v5.4.0 sometimes needlessly internally executed and cached the query.
   Affects ExpandComponent, ChildDocTransformer, CurrencyFieldType, TermsQParser.  (David Smiley)
 
+* SOLR-9922: Write buffering updates to another tlog. (Cao Manh Dat)
+
 Other Changes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ab316bbc/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index c8f5ae8..966497b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -449,7 +449,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
   final public void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
-    boolean replayed = false;
     boolean successfulRecovery = false;
 
     UpdateLog ulog;
@@ -500,8 +499,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
       // when we went down.  We may have received updates since then.
       recentVersions = startingVersions;
       try {
-        if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
-          // last operation at the time of startup had the GAP flag set...
+        if (ulog.existOldBufferLog()) {
           // this means we were previously doing a full index replication
           // that probably didn't complete and buffering updates in the
           // meantime.
@@ -542,9 +540,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
         }
 
         LOG.info("Begin buffering updates. core=[{}]", coreName);
+        // recalling buffer updates will drop the old buffer tlog
         ulog.bufferUpdates();
-        replayed = false;
-        
+
         LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
             ourUrl);
         zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
@@ -603,8 +601,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
             
             LOG.info("Replaying updates buffered during PeerSync.");
             replay(core);
-            replayed = true;
-            
+
             // sync success
             successfulRecovery = true;
             return;
@@ -630,8 +627,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           }
 
           replayFuture = replay(core);
-          replayed = true;
-          
+
           if (isClosed()) {
             LOG.info("RecoveryStrategy has been closed");
             break;
@@ -650,21 +646,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
       } catch (Exception e) {
         SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
       } finally {
-        if (!replayed) {
-          // dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates
-          // being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date.
-          // For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will
-          // reset our starting point for playback.
-          LOG.info("Replay not started, or was not successful... still buffering updates.");
-
-          /** this prev code is retained in case we want to switch strategies.
-          try {
-            ulog.dropBufferedUpdates();
-          } catch (Exception e) {
-            SolrException.log(log, "", e);
-          }
-          **/
-        }
         if (successfulRecovery) {
           LOG.info("Registering as Active after recovery.");
           try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ab316bbc/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 0a742e3..aa648dd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -97,7 +97,7 @@ public class ReplicateFromLeader {
                 new ModifiableSolrParams());
             CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
             cuc.setVersion(Long.parseLong(commitVersion));
-            updateLog.copyOverOldUpdates(cuc);
+            updateLog.commitAndSwitchToNewTlog(cuc);
             lastVersion = Long.parseLong(commitVersion);
           }
         });

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ab316bbc/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
index 3534f62..f668540 100644
--- a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * methods {@link #incref()}, {@link #close()} and {@link #reopenOutputStream()}.</li>
  * <li>encode the number of records in the tlog file in the last commit record. The number of records will be
  * decoded and reuse if the tlog file is reopened. This is achieved by extending the constructor, and the
- * methods {@link #writeCommit(CommitUpdateCommand, int)} and {@link #getReader(long)}.</li>
+ * methods {@link #writeCommit(CommitUpdateCommand)} and {@link #getReader(long)}.</li>
  * </ul>
  */
 public class CdcrTransactionLog extends TransactionLog {
@@ -108,7 +108,7 @@ public class CdcrTransactionLog extends TransactionLog {
   }
 
   @Override
-  public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
+  public long write(AddUpdateCommand cmd, long prevPointer) {
     assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
 
     LogCodec codec = new LogCodec(resolver);
@@ -125,7 +125,7 @@ public class CdcrTransactionLog extends TransactionLog {
       codec.init(out);
       if (cmd.isInPlaceUpdate()) {
         codec.writeTag(JavaBinCodec.ARR, 6);
-        codec.writeInt(UpdateLog.UPDATE_INPLACE | flags);  // should just take one byte
+        codec.writeInt(UpdateLog.UPDATE_INPLACE);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         codec.writeLong(prevPointer);
         codec.writeLong(cmd.prevVersion);
@@ -141,7 +141,7 @@ public class CdcrTransactionLog extends TransactionLog {
 
       } else {
         codec.writeTag(JavaBinCodec.ARR, 4);
-        codec.writeInt(UpdateLog.ADD | flags);  // should just take one byte
+        codec.writeInt(UpdateLog.ADD);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
           // if the update is received via cdcr source; add extra boolean entry
@@ -179,7 +179,7 @@ public class CdcrTransactionLog extends TransactionLog {
   }
 
   @Override
-  public long writeDelete(DeleteUpdateCommand cmd, int flags) {
+  public long writeDelete(DeleteUpdateCommand cmd) {
     LogCodec codec = new LogCodec(resolver);
 
     try {
@@ -190,7 +190,7 @@ public class CdcrTransactionLog extends TransactionLog {
       MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
       codec.init(out);
       codec.writeTag(JavaBinCodec.ARR, 4);
-      codec.writeInt(UpdateLog.DELETE | flags);  // should just take one byte
+      codec.writeInt(UpdateLog.DELETE);  // should just take one byte
       codec.writeLong(cmd.getVersion());
       codec.writeByteArray(br.bytes, br.offset, br.length);
       if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
@@ -217,7 +217,7 @@ public class CdcrTransactionLog extends TransactionLog {
   }
 
   @Override
-  public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
+  public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
     LogCodec codec = new LogCodec(resolver);
     try {
       checkWriteHeader(codec, null);
@@ -225,7 +225,7 @@ public class CdcrTransactionLog extends TransactionLog {
       MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
       codec.init(out);
       codec.writeTag(JavaBinCodec.ARR, 4);
-      codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags);  // should just take one byte
+      codec.writeInt(UpdateLog.DELETE_BY_QUERY);  // should just take one byte
       codec.writeLong(cmd.getVersion());
       codec.writeStr(cmd.query);
       if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
@@ -249,7 +249,7 @@ public class CdcrTransactionLog extends TransactionLog {
   }
 
   @Override
-  public long writeCommit(CommitUpdateCommand cmd, int flags) {
+  public long writeCommit(CommitUpdateCommand cmd) {
     LogCodec codec = new LogCodec(resolver);
     synchronized (this) {
       try {
@@ -261,7 +261,7 @@ public class CdcrTransactionLog extends TransactionLog {
         }
         codec.init(fos);
         codec.writeTag(JavaBinCodec.ARR, 4);
-        codec.writeInt(UpdateLog.COMMIT | flags);  // should just take one byte
+        codec.writeInt(UpdateLog.COMMIT);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         codec.writeTag(JavaBinCodec.INT); // Enforce the encoding of a plain integer, to simplify decoding
         fos.writeInt(numRecords + 1); // the number of records in the file - +1 to account for the commit operation being written

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ab316bbc/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
index 6b20204..bff1612 100644
--- a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
@@ -352,7 +352,6 @@ public class CdcrUpdateLog extends UpdateLog {
     long latestVersion = startingUpdates.getMaxRecentVersion();
     try {
       startingVersions = startingUpdates.getVersions(numRecordsToKeep);
-      startingOperation = startingUpdates.getLatestOperation();
 
       // populate recent deletes list (since we can't get that info from the index)
       for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
@@ -389,9 +388,7 @@ public class CdcrUpdateLog extends UpdateLog {
    */
   private void copyBufferedUpdates(File tlogSrc, long offsetSrc, long latestVersion) {
     recoveryInfo = new RecoveryInfo();
-    recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
     state = State.BUFFERING;
-    operationFlags |= FLAG_GAP;
 
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ab316bbc/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
index 0f89016..8ed7d7a 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
@@ -166,20 +166,6 @@ public class HdfsTransactionLog extends TransactionLog {
     }
     return true;
   }
-  
-  // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
-  // This should only be used to roll back buffered updates, not actually applied updates.
-  @Override
-  public void rollback(long pos) throws IOException {
-    synchronized (this) {
-      assert snapshot_size == pos;
-      ensureFlushed();
-      // TODO: how do we rollback with hdfs?? We need HDFS-3107
-      fos.setWritten(pos);
-      assert fos.size() == pos;
-      numRecords = snapshot_numRecords;
-    }
-  }
 
   private void readHeader(FastInputStream fis) throws IOException {
     // read existing header
@@ -210,7 +196,7 @@ public class HdfsTransactionLog extends TransactionLog {
   }
 
   @Override
-  public long writeCommit(CommitUpdateCommand cmd, int flags) {
+  public long writeCommit(CommitUpdateCommand cmd) {
     LogCodec codec = new LogCodec(resolver);
     synchronized (this) {
       try {
@@ -223,7 +209,7 @@ public class HdfsTransactionLog extends TransactionLog {
         
         codec.init(fos);
         codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.COMMIT | flags);  // should just take one byte
+        codec.writeInt(UpdateLog.COMMIT);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         codec.writeStr(END_MESSAGE);  // ensure these bytes are (almost) last in the file
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ab316bbc/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
index 7bb74d0..8ca4b1c 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
@@ -65,37 +65,6 @@ public class HdfsUpdateLog extends UpdateLog {
     this.confDir = confDir;
   }
   
-  // HACK
-  // while waiting for HDFS-3107, instead of quickly
-  // dropping, we slowly apply
-  // This is somewhat brittle, but current usage
-  // allows for it
-  @Override
-  public boolean dropBufferedUpdates() {
-    versionInfo.blockUpdates();
-    try {
-      if (state != State.BUFFERING) return false;
-      
-      if (log.isInfoEnabled()) {
-        log.info("Dropping buffered updates " + this);
-      }
-      
-      // since we blocked updates, this synchronization shouldn't strictly be
-      // necessary.
-      synchronized (this) {
-        if (tlog != null) {
-          // tlog.rollback(recoveryInfo.positionOfStart);
-        }
-      }
-      
-      state = State.ACTIVE;
-      operationFlags &= ~FLAG_GAP;
-    } finally {
-      versionInfo.unblockUpdates();
-    }
-    return true;
-  }
-  
   @Override
   public void init(PluginInfo info) {
     super.init(info);
@@ -186,6 +155,11 @@ public class HdfsUpdateLog extends UpdateLog {
         throw new RuntimeException("Problem creating directory: " + tlogDir, e);
       }
     }
+
+    String[] oldBufferTlog = getBufferLogList(fs, tlogDir);
+    if (oldBufferTlog != null && oldBufferTlog.length != 0) {
+      existOldBufferLog = true;
+    }
     
     tlogFiles = getLogList(fs, tlogDir);
     id = getLastLogId() + 1; // add 1 since we will create a new log for the
@@ -241,7 +215,6 @@ public class HdfsUpdateLog extends UpdateLog {
     // non-complete tlogs.
     try (RecentUpdates startingUpdates = getRecentUpdates()) {
       startingVersions = startingUpdates.getVersions(getNumRecordsToKeep());
-      startingOperation = startingUpdates.getLatestOperation();
 
       // populate recent deletes list (since we can't get that info from the
       // index)
@@ -269,6 +242,23 @@ public class HdfsUpdateLog extends UpdateLog {
   public String getLogDir() {
     return tlogDir.toUri().toString();
   }
+
+  public static String[] getBufferLogList(FileSystem fs, Path tlogDir) {
+    final String prefix = BUFFER_TLOG_NAME+'.';
+    assert fs != null;
+    FileStatus[] fileStatuses;
+    try {
+      fileStatuses = fs.listStatus(tlogDir, path -> path.getName().startsWith(prefix));
+    } catch (IOException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed on listing old buffer tlog", e);
+    }
+
+    String[] names = new String[fileStatuses.length];
+    for (int i = 0; i < fileStatuses.length; i++) {
+      names[i] = fileStatuses[i].getPath().getName();
+    }
+    return names;
+  }
   
   public static String[] getLogList(FileSystem fs, Path tlogDir) {
     final String prefix = TLOG_NAME + '.';
@@ -307,7 +297,35 @@ public class HdfsUpdateLog extends UpdateLog {
       IOUtils.closeQuietly(fs);
     }
   }
-  
+
+  @Override
+  protected void ensureBufferTlog() {
+    if (bufferTlog != null) return;
+    String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime());
+    bufferTlog = new HdfsTransactionLog(fs, new Path(tlogDir, newLogName),
+        globalStrings, tlogDfsReplication);
+  }
+
+  @Override
+  protected void deleteBufferLogs() {
+    // Delete old buffer logs
+    String[] oldBufferTlog = getBufferLogList(fs, tlogDir);
+    if (oldBufferTlog != null && oldBufferTlog.length != 0) {
+      for (String oldBufferLogName : oldBufferTlog) {
+        Path f = new Path(tlogDir, oldBufferLogName);
+        try {
+          boolean s = fs.delete(f, false);
+          if (!s) {
+            log.error("Could not remove old buffer tlog file:" + f);
+          }
+        } catch (IOException e) {
+          // No need to bubble up this exception, because it won't cause any problems on recovering
+          log.error("Could not remove old buffer tlog file:" + f, e);
+        }
+      }
+    }
+  }
+
   @Override
   protected void ensureLog() {
     if (tlog == null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ab316bbc/solr/core/src/java/org/apache/solr/update/TransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index 96a928c..2a23896 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -85,9 +85,6 @@ public class TransactionLog implements Closeable {
   Map<String,Integer> globalStringMap = new HashMap<>();
   List<String> globalStringList = new ArrayList<>();
 
-  long snapshot_size;
-  int snapshot_numRecords;
-
   // write a BytesRef as a byte array
   static final JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
     @Override
@@ -153,7 +150,7 @@ public class TransactionLog implements Closeable {
 
       // Parse tlog id from the filename
       String filename = tlogFile.getName();
-      id = Long.parseLong(filename.substring(filename.indexOf('.') + 1, filename.indexOf('.') + 20));
+      id = Long.parseLong(filename.substring(filename.lastIndexOf('.')+1));
 
       this.tlogFile = tlogFile;
       raf = new RandomAccessFile(this.tlogFile, "rw");
@@ -233,29 +230,6 @@ public class TransactionLog implements Closeable {
     return true;
   }
 
-  /** takes a snapshot of the current position and number of records
-   * for later possible rollback, and returns the position */
-  public long snapshot() {
-    synchronized (this) {
-      snapshot_size = fos.size();
-      snapshot_numRecords = numRecords;
-      return snapshot_size;
-    }
-  }
-
-  // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
-  // This should only be used to roll back buffered updates, not actually applied updates.
-  public void rollback(long pos) throws IOException {
-    synchronized (this) {
-      assert snapshot_size == pos;
-      fos.flush();
-      raf.setLength(pos);
-      fos.setWritten(pos);
-      assert fos.size() == pos;
-      numRecords = snapshot_numRecords;
-    }
-  }
-
   public long writeData(Object o) {
     @SuppressWarnings("resource") final LogCodec codec = new LogCodec(resolver);
     try {
@@ -346,17 +320,16 @@ public class TransactionLog implements Closeable {
 
   /**
    * Writes an add update command to the transaction log. This is not applicable for
-   * in-place updates; use {@link #write(AddUpdateCommand, long, int)}.
+   * in-place updates; use {@link #write(AddUpdateCommand, long)}.
    * (The previous pointer (applicable for in-place updates) is set to -1 while writing
    * the command to the transaction log.)
    * @param cmd The add update command to be written
-   * @param flags Options for writing the command to the transaction log
    * @return Returns the position pointer of the written update command
    * 
-   * @see #write(AddUpdateCommand, long, int)
+   * @see #write(AddUpdateCommand, long)
    */
-  public long write(AddUpdateCommand cmd, int flags) {
-    return write(cmd, -1, flags);
+  public long write(AddUpdateCommand cmd) {
+    return write(cmd, -1);
   }
 
   /**
@@ -365,10 +338,9 @@ public class TransactionLog implements Closeable {
    * @param cmd The add update command to be written
    * @param prevPointer The pointer in the transaction log which this update depends 
    * on (applicable for in-place updates)
-   * @param flags Options for writing the command to the transaction log
    * @return Returns the position pointer of the written update command
    */
-  public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
+  public long write(AddUpdateCommand cmd, long prevPointer) {
     assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
     
     LogCodec codec = new LogCodec(resolver);
@@ -386,14 +358,14 @@ public class TransactionLog implements Closeable {
       codec.init(out);
       if (cmd.isInPlaceUpdate()) {
         codec.writeTag(JavaBinCodec.ARR, 5);
-        codec.writeInt(UpdateLog.UPDATE_INPLACE | flags);  // should just take one byte
+        codec.writeInt(UpdateLog.UPDATE_INPLACE);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         codec.writeLong(prevPointer);
         codec.writeLong(cmd.prevVersion);
         codec.writeSolrInputDocument(cmd.getSolrInputDocument());
       } else {
         codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.ADD | flags);  // should just take one byte
+        codec.writeInt(UpdateLog.ADD);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         codec.writeSolrInputDocument(cmd.getSolrInputDocument());
       }
@@ -422,7 +394,7 @@ public class TransactionLog implements Closeable {
     }
   }
 
-  public long writeDelete(DeleteUpdateCommand cmd, int flags) {
+  public long writeDelete(DeleteUpdateCommand cmd) {
     LogCodec codec = new LogCodec(resolver);
 
     try {
@@ -433,7 +405,7 @@ public class TransactionLog implements Closeable {
       MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
       codec.init(out);
       codec.writeTag(JavaBinCodec.ARR, 3);
-      codec.writeInt(UpdateLog.DELETE | flags);  // should just take one byte
+      codec.writeInt(UpdateLog.DELETE);  // should just take one byte
       codec.writeLong(cmd.getVersion());
       codec.writeByteArray(br.bytes, br.offset, br.length);
 
@@ -452,7 +424,7 @@ public class TransactionLog implements Closeable {
 
   }
 
-  public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
+  public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
     LogCodec codec = new LogCodec(resolver);
     try {
       checkWriteHeader(codec, null);
@@ -460,7 +432,7 @@ public class TransactionLog implements Closeable {
       MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
       codec.init(out);
       codec.writeTag(JavaBinCodec.ARR, 3);
-      codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags);  // should just take one byte
+      codec.writeInt(UpdateLog.DELETE_BY_QUERY);  // should just take one byte
       codec.writeLong(cmd.getVersion());
       codec.writeStr(cmd.query);
 
@@ -478,7 +450,7 @@ public class TransactionLog implements Closeable {
   }
 
 
-  public long writeCommit(CommitUpdateCommand cmd, int flags) {
+  public long writeCommit(CommitUpdateCommand cmd) {
     LogCodec codec = new LogCodec(resolver);
     synchronized (this) {
       try {
@@ -490,7 +462,7 @@ public class TransactionLog implements Closeable {
         }
         codec.init(fos);
         codec.writeTag(JavaBinCodec.ARR, 3);
-        codec.writeInt(UpdateLog.COMMIT | flags);  // should just take one byte
+        codec.writeInt(UpdateLog.COMMIT);  // should just take one byte
         codec.writeLong(cmd.getVersion());
         codec.writeStr(END_MESSAGE);  // ensure these bytes are (almost) last in the file
 


Mime
View raw message