lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r1585682 [1/2] - in /lucene/dev/branches/lucene5438/lucene: core/src/java/org/apache/lucene/index/ core/src/java/org/apache/lucene/store/ replicator/src/java/org/apache/lucene/replicator/ replicator/src/test/org/apache/lucene/replicator/ te...
Date Tue, 08 Apr 2014 10:43:02 GMT
Author: mikemccand
Date: Tue Apr  8 10:43:01 2014
New Revision: 1585682

URL: http://svn.apache.org/r1585682
Log:
LUCENE-5438: checkpoint current [broken] state

Modified:
    lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
    lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
    lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java
    lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
    lucene/dev/branches/lucene5438/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java

Modified: lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java?rev=1585682&r1=1585681&r2=1585682&view=diff
==============================================================================
--- lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java (original)
+++ lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java Tue Apr  8 10:43:01 2014
@@ -415,6 +415,7 @@ public final class SegmentInfos implemen
   private void write(Directory directory) throws IOException {
 
     String segmentFileName = getNextSegmentFileName();
+    assert directory.fileExists(segmentFileName) == false: "segments file " + segmentFileName + " already exists!";
     
     // Always advance the generation on write:
     if (generation == -1) {

Modified: lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java?rev=1585682&r1=1585681&r2=1585682&view=diff
==============================================================================
--- lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java (original)
+++ lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java Tue Apr  8 10:43:01 2014
@@ -22,13 +22,13 @@ import java.io.IOException;
  * 
  * @lucene.internal
  */
-final class RateLimitedIndexOutput extends BufferedIndexOutput {
+public final class RateLimitedIndexOutput extends BufferedIndexOutput {
   
   private final IndexOutput delegate;
   private final BufferedIndexOutput bufferedDelegate;
   private final RateLimiter rateLimiter;
 
-  RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) {
+  public RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) {
     // TODO should we make buffer size configurable
     if (delegate instanceof BufferedIndexOutput) {
       bufferedDelegate = (BufferedIndexOutput) delegate;

Modified: lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java?rev=1585682&r1=1585681&r2=1585682&view=diff
==============================================================================
--- lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java (original)
+++ lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java Tue Apr  8 10:43:01 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -77,8 +78,11 @@ public class SlowChecksumDirectory exten
       }
 
       long value = checksum.getValue();
-      System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " " + name + ": record pending checksum=" + value);
+      System.out.println("[" + Thread.currentThread().getName() + "] id=" + checksums.id + " " + name + ": record pending checksum=" + value + " (current checkums=" + getChecksum(name) + ") len=" + in.fileLength(name));
       pendingChecksums.put(name, value);
+
+      // In case we overwrote this file:
+      checksums.remove(name);
     } finally {
       input.close();
     }
@@ -97,7 +101,7 @@ public class SlowChecksumDirectory exten
   }
 
   public void sync(Collection<String> names) throws IOException {
-    System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " sync " + names);
+    System.out.println("[" + Thread.currentThread().getName() + "] id=" + checksums.id + " sync " + names);
     in.sync(names);
     for(String name : names) {
       Long v = pendingChecksums.get(name);
@@ -116,7 +120,7 @@ public class SlowChecksumDirectory exten
 
   @Override
   public void deleteFile(String name) throws IOException {
-    System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " " + name + " now delete");
+    System.out.println("[" + Thread.currentThread().getName() + "] id=" + checksums.id + " " + name + " now delete");
     in.deleteFile(name);
     pendingChecksums.remove(name);
     checksums.remove(name);
@@ -156,6 +160,7 @@ public class SlowChecksumDirectory exten
       this.id = id;
       this.dir = dir;
       long maxGen = -1;
+      Set<String> seen = new HashSet<>();
       for (String fileName : dir.listAll()) {
         if (fileName.startsWith(FILE_NAME_PREFIX)) {
           long gen = Long.parseLong(fileName.substring(1+FILE_NAME_PREFIX.length()),
@@ -163,6 +168,8 @@ public class SlowChecksumDirectory exten
           if (gen > maxGen) {
             maxGen = gen;
           }
+        } else {
+          seen.add(fileName);
         }
       }
 
@@ -177,7 +184,15 @@ public class SlowChecksumDirectory exten
           for(int i=0;i<count;i++) {
             String name = in.readString();
             long checksum = in.readLong();
-            checksums.put(name, checksum);
+            // Must filter according to what's in the
+            // directory now because we may have deleted
+            // some files but then crashed and then our
+            // checksum state is invalid:
+            if (seen.contains(name)) {
+              checksums.put(name, checksum);
+            } else {
+              System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + name + ": skip this checkum file on init: it does not exist");
+            }
           }
           nextWriteGen = maxGen+1;
           System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + genToFileName(maxGen) + " loaded checksums");

Modified: lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java?rev=1585682&r1=1585681&r2=1585682&view=diff
==============================================================================
--- lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java (original)
+++ lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java Tue Apr  8 10:43:01 2014
@@ -19,7 +19,9 @@ package org.apache.lucene.replicator;
 
 import java.io.Closeable;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.file.NoSuchFileException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -29,9 +31,15 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.zip.Adler32;
@@ -79,6 +87,8 @@ import org.apache.lucene.store.MockDirec
 import org.apache.lucene.store.NRTCachingDirectory;
 import org.apache.lucene.store.RAMDirectory;
 import org.apache.lucene.store.RAMOutputStream;
+import org.apache.lucene.store.RateLimitedIndexOutput;
+import org.apache.lucene.store.RateLimiter;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
 import org.apache.lucene.util.LineFileDocs;
@@ -89,8 +99,25 @@ import org.apache.lucene.util.TreeLogger
 import org.apache.lucene.util._TestUtil;
 import org.junit.AfterClass;
 
+// nocommit add SLM
+
+// nocommit also allow downgrade of Master to Replica,
+// instead of Master.close then Replica init
+
+// nocommit make sure we are not over-IncRef'ing infos on master
+
+// nocommit test replicas that are slow to copy
+
 // nocommit add fang: master crashes
 
+// nocommit provoke more "going backwards", e.g. randomly
+// sometimes shutdown whole cluster and pick random node to
+// be the new master
+
+// nocommit should we have support for "flush as frequently
+// as you can"?  or at least, "do not flush so frequently
+// that replicas can't finish copying before next flush"?
+
 // nocommit what about network partitioning
 
 // nocommit make MDW throw exceptions sometimes
@@ -99,36 +126,32 @@ import org.junit.AfterClass;
 
 // nocommit test rare bit errors during copy
 
-// nocommit: master's IW must not do reader pooling?  else
-// deletes are not pushed to disk?
-
 // nocommit test slow replica
 
-// nocommit make flush sync async
-
 // nocommit test replica that "falls out" because it's too
 // slow and then tries to join back again w/o having
 // "properly" restarted
 
-// nocommit also test sometimes starting up new master from
-// a down replica (ie, not just promoting an already running
-// replica)
-
 // nocommit rewrite the test so each node has its own
 // threads, to be closer to the concurrency we'd "really"
 // see across N machines
 
-// nocommit checksum_N files never delete!
+// nocommit also allow replicas pulling files from replicas;
+// they need not always come from master
 
 @SuppressCodecs({ "SimpleText", "Memory", "Direct" })
 public class TestNRTReplication extends LuceneTestCase {
 
   static volatile Master master;
-  static ReentrantLock masterLock = new ReentrantLock();
+  static final AtomicInteger masterCount = new AtomicInteger();
+  static final Lock masterLock = new ReentrantLock();
+  static Object[] nodes;
 
   @AfterClass
   public static void afterClass() {
+    System.out.println("TEST: now afterClass");
     master = null;
+    nodes = null;
   }
 
   public void test() throws Throwable {
@@ -141,90 +164,9 @@ public class TestNRTReplication extends 
     }
   }
 
-  /*
-  private static Map<String,Long> globalState = new HashMap<>();
-
-  private static void setGlobalStateKeyVal(String key, long value) {
-    Long cur = globalState.get(key);
-    assert cur == null || cur <= value;
-    TreeLogger.log("  push " + key + " cur=" + cur + " new=" + value);
-    globalState.put(key, value);
-  }
-  */
-
-  /** Called just before IndexWriter finishCommit on the
-   *  current master, to push "next write gens" to global
-   *  state. */
-  /*
-  static void pushGlobalState(SegmentInfos infos) {
-    TreeLogger.log("TEST: now pushGlobalState");
-    TreeLogger.start("pushGlobalState");    
-    // NOTE: assumed externally sync'd, i.e. only one master
-    // across the cluster at a time
-    setGlobalStateKeyVal("segmentsGen", infos.getGeneration());
-    // nocommit weird that we must add 2 :)
-    setGlobalStateKeyVal("segmentsVersion", infos.getVersion()+2);
-
-    // Used to generate next segment file name:
-    setGlobalStateKeyVal("segmentsCounter", (long) infos.counter);
-
-    for(SegmentCommitInfo info : infos) {
-      setGlobalStateKeyVal(info.info.name + "_delGen",  info.getNextDelGen());
-      setGlobalStateKeyVal(info.info.name + "_fieldInfosGen",  info.getNextFieldInfosGen());
-    }
-    TreeLogger.end("pushGlobalState");
-  }
-  */
-
-  /** Called just before init of a new writer, to pull the
-   *  "next write gens" and set them in the current infos. */
-  /*
-  static void pullGlobalState(SegmentInfos infos) {
-    TreeLogger.log("TEST: now pullGlobalState");
-    TreeLogger.start("pullGlobalState");
-    Long v = globalState.get("segmentsGen");
-    if (v == null) {
-      TreeLogger.log("no global state yet; skip");
-      TreeLogger.end("pullGlobalState");
-      return;
-    }
-    TreeLogger.log("pull global gen=" + v + " vs cur=" + infos.getGeneration());
-    assert infos.getGeneration() <= v.longValue(): "infos.generation=" + infos.getGeneration() + " global.generation=" + v;
-    infos.setGeneration(v.longValue());
-
-    v = globalState.get("segmentsVersion");
-    assert v != null;
-    assert infos.version <= v.longValue(): "infos.version=" + infos.version + " global.version=" + v;
-    TreeLogger.log("pull global version=" + v + " vs cur=" + infos.version);
-    infos.version = v.longValue();
-
-    v = globalState.get("segmentsCounter");
-    assert v != null;
-    assert infos.counter <= v.longValue(): "infos.counter=" + infos.counter + " global.counter=" + v;
-    TreeLogger.log("pull global counter=" + v + " vs cur=" + infos.counter);
-    infos.counter = v.intValue();
-
-    for(SegmentCommitInfo info : infos) {
-      String key = info.info.name + "_delGen";
-      v = globalState.get(key);
-      long value = v == null ? 1 : v.longValue();
-      assert info.getNextDelGen() <= value: "seg=" + info.info.name + " delGen=" + info.getNextDelGen() + " vs global=" + value;
-      TreeLogger.log("pull global del gen=" + v + " for seg=" + info.info.name + " vs cur=" + info.getNextDelGen());
-      info.setNextDelGen(value);
-
-      key = info.info.name + "_fieldInfosGen";
-      v = globalState.get(key);
-      value = v == null ? 1 : v.longValue();
-      assert info.getNextFieldInfosGen() <= value: "seg=" + info.info.name + " fieldInfosGen=" + info.getNextFieldInfosGen() + " vs global=" + value;
-      TreeLogger.log("pull global fieldInfos gen= " + v + " for seg=" + info.info.name + " vs cur=" + info.getNextFieldInfosGen());
-      info.setNextFieldInfosGen(value);
-    }
-    TreeLogger.end("pullGlobalState");
-  }
-  */
-  
   private void _test() throws Exception {
 
+    Thread.currentThread().setName("main");
     TreeLogger.setLogger(new TreeLogger("main"));
 
     // Maps all segmentInfos.getVersion() we've seen, to the
@@ -232,18 +174,20 @@ public class TestNRTReplication extends 
     // *:* search we verify the totalHits is correct:
     final Map<Long,Integer> versionDocCounts = new ConcurrentHashMap<Long,Integer>();
 
-    int numDirs = 1+_TestUtil.nextInt(random(), 2, 4);
+    int numDirs = 1+_TestUtil.nextInt(random(), 2, 6);
     // nocommit
-    numDirs = 6;
+    //int numDirs = 2;
+
     final File[] dirs = new File[numDirs];
-    // One Master and N-1 Replica:
-    final Object[] nodes = new Object[numDirs];
+
+    // One Master (initially node 0) and N-1 Replica:
+    nodes = new Object[numDirs];
     System.out.println("TEST: " + nodes.length + " nodes");
     for(int i=0;i<numDirs;i++) {
-      dirs[i] = _TestUtil.getTempDir("NRTReplication");
+      dirs[i] = _TestUtil.getTempDir("NRTReplication." + i + "_");
       if (i > 0) {
+        // Some replicas don't start on init:
         if (random().nextInt(10) < 7) {
-          // Some replicas don't start on init:
           nodes[i] = new Replica(dirs[i], i, versionDocCounts, null);
         } else {
           System.out.println("TEST: skip replica " + i + " startup");
@@ -251,12 +195,7 @@ public class TestNRTReplication extends 
       }
     }
 
-    nodes[0] = master = new Master(dirs[0], 0, nodes, versionDocCounts);
-
-    // Periodically stops/starts/commits replicas, moves master:
-    CommitThread commitThread = new CommitThread(dirs, nodes, versionDocCounts);
-    commitThread.setName("commitThread");
-    commitThread.start();
+    nodes[0] = master = new Master(dirs[0], 0, versionDocCounts);
 
     // nocommit test graceful full shutdown / restart
 
@@ -266,7 +205,7 @@ public class TestNRTReplication extends 
     //long endTime = System.currentTimeMillis() + 10000;
 
     while (System.currentTimeMillis() < endTime) {
-      Thread.sleep(_TestUtil.nextInt(random(), 2, 20));
+      Thread.sleep(_TestUtil.nextInt(random(), 2, 50));
 
       assert master != null;
 
@@ -274,175 +213,192 @@ public class TestNRTReplication extends 
       TreeLogger.start("replicate");
 
       SegmentInfos infos;
-      boolean closeMaster = false;
-      if (master == null) {
-        infos = null;
+      boolean closeMaster = random().nextInt(100) == 57;
+      if (closeMaster) {
+        TreeLogger.log("top: id=" + master.id + " now move master");
+        // Commits & closes current master and pull the
+        // infos of the final commit:
+        master.close(random().nextBoolean());
+        TreeLogger.log("top: done shutdown master");
       } else {
-        if (random().nextInt(100) == 57) {
-          closeMaster = true;
-          TreeLogger.log("top: id=" + master.id + " now move master");
-          // Commits & closes current master and pull the
-          // infos of the final commit:
-          masterLock.lock();
-          try {
-            infos = master.close(random().nextBoolean());
-          } finally {
-            masterLock.unlock();
-          }
-          TreeLogger.log("top: done shutdown master; version=" + infos.version);
-        } else {
 
-          // Have writer do a full flush, and return the
-          // resulting segments, protected from deletion
-          // (incRef'd) just while we copy the files out to
-          // the replica (s).  This is just like pulling an
-          // NRT reader, except we don't actually open the
-          // readers on the newly flushed segments:
-          TreeLogger.log("flush current master");
-          infos = master.writer.w.flushAndIncRef();
-        }
-      }
+        // Have writer do a full flush, and return the
+        // resulting segments, protected from deletion
+        // (incRef'd) just while we copy the files out to
+        // the replica (s).  This is just like pulling an
+        // NRT reader, except we don't actually open the
+        // readers on the newly flushed segments:
+        TreeLogger.log("flush current master");
+        master.flush();
+        TreeLogger.log("done flush current master");
+      }
+
+      CopyState copyState = master.getCopyState();
+
+      // nocommit also allow downgrade Master -> Replica,
+      // NOT a full close
+
+      int count = docCount(copyState.infos);
+      TreeLogger.log("record version=" + copyState.version + " count=" + count + " segs=" + copyState.infos.toString(copyState.dir));
+      Integer oldCount = versionDocCounts.put(copyState.version, count);
 
-      if (infos != null) {
+      // Refresh the local searcher on master:
+      if (closeMaster == false) {
+        master.mgr.setCurrentInfos(copyState.infos);
+        master.mgr.maybeRefresh();
+      }
 
-        int count = docCount(infos);
-        TreeLogger.log("record version=" + infos.version + " count=" + count + " segs=" + infos.toString(master.dir));
-        Integer oldCount = versionDocCounts.put(infos.version, count);
-
-        // nocommit cannot do this: versions can go backwards
-        //if (oldCount != null) {
-        //assertEquals("version=" + infos.version + " oldCount=" + oldCount + " newCount=" + count, oldCount.intValue(), count);
-        //}
-
-        // nocommit need to do this concurrently w/ pushing
-        // out to replicas:
-        if (closeMaster == false) {
-          master.mgr.refresh(infos);
-        }
-
-        // nocommit can we have commit commit the "older" SIS?
-        // they will commit quickly since the OS will have
-        // already moved those bytes to disk...
-
-        int totDocCount = docCount(infos);
-        String extra = " master.sizeInBytes=" + ((NRTCachingDirectory) master.dir.getDelegate()).sizeInBytes();
-
-        TreeLogger.log("replicate docCount=" + totDocCount + " version=" + infos.version + extra + " segments=" + infos.toString(master.dir));
-
-        // Convert infos to byte[], to send "on the wire":
-        RAMOutputStream out = new RAMOutputStream();
-        infos.write(out);
-        byte[] infosBytes = new byte[(int) out.getFilePointer()];
-        out.writeTo(infosBytes, 0);
-        
-        // nocommit test master crash (promoting replica to master)
-
-        // nocommit do this sync in separate threads
-        Map<String,FileMetaData> filesMetaData = getFilesMetaData(master, infos.files(master.dir, false));
-
-        // nocommit simulate super-slow replica: it should not
-        // hold up the copying of other replicas, nor new
-        // flushing; the copy of a given SIS to a given
-        // replica should be fully concurrent/bg
-        int upto = 0;
-        for(Object n : nodes) {
-          if (n != null && n instanceof Replica) {
-            Replica r = (Replica) n;
+      // nocommit break this into separate tests, so we can
+      // test the "clean" case where versions are "correct":
 
-            // nocommit improve this: load each file ONCE,
-            // push to the N replicas that need it
-            try {
-              r.sync(master.dir, filesMetaData, infosBytes, infos.version);
-            } catch (AlreadyClosedException ace) {
-              // Ignore this: it means the replica shut down
-              // while we were trying to sync.  This
-              // "approximates" an exception the master would
-              // see trying to push file bytes to a replica
-              // that was just taken offline.
-            } catch (Exception e) {
-              TreeLogger.log("TEST FAIL: replica " + r.id, e);
-              throw e;
-            }
-          } else if (n == null) {
-            TreeLogger.log("id=" + upto + " skip down replica");
-          }
-          upto++;
+      // nocommit cannot do this: versions can go backwards
+      //if (oldCount != null) {
+      //assertEquals("version=" + infos.version + " oldCount=" + oldCount + " newCount=" + count, oldCount.intValue(), count);
+      //}
+
+      // nocommit can we have commit commit the "older" SIS?
+      // they will commit quickly since the OS will have
+      // already moved those bytes to disk...
+
+      String extra = " master.sizeInBytes=" + ((NRTCachingDirectory) master.dir.getDelegate()).sizeInBytes();
+
+      TreeLogger.log("replicate docCount=" + count + " version=" + copyState.version + extra + " segments=" + copyState.infos.toString(copyState.dir));
+
+      // nocommit test master crash (promoting replica to master)
+
+      // nocommit simulate super-slow replica: it should not
+      // hold up the copying of other replicas, nor new
+      // flushing; the copy of a given SIS to a given
+      // replica should be fully concurrent/bg
+
+      // Notify all running replicas that they should now
+      // pull the new flush over:
+      int upto = 0;
+      for(Object n : nodes) {
+        if (n != null && n instanceof Replica) {
+          Replica r = (Replica) n;
+          // nocommit can we "broadcast" the new files
+          // instead of each replica pulling its own copy
+          // ...
+          TreeLogger.log("id=" + upto + ": signal new flush");
+          r.newFlush();
+        } else if (n == null) {
+          TreeLogger.log("id=" + upto + " skip down replica");
         }
+        upto++;
       }
 
-      if (closeMaster == false) {
-        // Done pushing to all replicas so we now release
-        // the files on master, so IW is free to delete if it
-        // needs to:
-        master.setInfos(infos);
-      } else {
+      master.releaseCopyState(copyState);
 
-        // clean this up:
-        TreeLogger.log("close old master dir dir.listAll()=" + Arrays.toString(master.dir.listAll()));
+      if (closeMaster) {
+        if (random().nextBoolean()) {
+          TreeLogger.log("top: id=" + master.id + " now waitIdle");
+          master.waitIdle();
+          TreeLogger.log("top: id=" + master.id + " done waitIdle");
+        } else {
+          TreeLogger.log("top: id=" + master.id + " skip waitIdle");
+          Thread.sleep(random().nextInt(5));
+        }
+
+        TreeLogger.log("top: id=" + master.id + " close old master dir dir.listAll()=" + Arrays.toString(master.dir.listAll()));
         master.dir.close();
 
         masterLock.lock();
         try {
+          masterCount.incrementAndGet();
+        } finally {
+          masterLock.unlock();
+        }
 
-          nodes[master.id] = null;
-
-          // nocommit go back to picking random replica
+        nodes[master.id] = null;
 
-          // Must pick newest replica to promote, else we
-          // can't overwrite open files when trying to copy
-          // to the newer replicas:
-          int bestIDX = -1;
-          long highestVersion = -1;
-          for(int idx=0;idx<nodes.length;idx++) {
-            if (nodes[idx] instanceof Replica) {
-              Replica r = (Replica) nodes[idx];
-              long version = r.mgr.getCurrentInfos().version;
-              if (version > highestVersion) {
-                bestIDX = idx;
-                highestVersion = version;
-              }
+        // nocommit make sure we test race here, where
+        // replica is coming up just as we are electing a
+        // new master
+
+        // Must pick newest replica to promote, else we
+        // can't overwrite open files when trying to copy
+        // to the newer replicas:
+        int bestIDX = -1;
+        long highestVersion = -1;
+        for (int idx=0;idx<nodes.length;idx++) {
+          if (nodes[idx] instanceof Replica) {
+            Replica r = (Replica) nodes[idx];
+            long version = r.mgr.getCurrentInfos().version;
+            TreeLogger.log("top: id=" + r.id + " check version=" + version);
+            if (version > highestVersion) {
+              bestIDX = idx;
+              highestVersion = version;
+              TreeLogger.log("top: id=" + r.id + " check version=" + version + " max so far");
             }
           }
+        }
 
-          int idx;
-          if (bestIDX != -1) {
-            idx = bestIDX;
-          } else {
-            // All replicas are down; it doesn't matter
-            // which one we pick
-            idx = random().nextInt(nodes.length);
-          }
+        int idx;
+        if (bestIDX != -1) {
+          idx = bestIDX;
+        } else {
+          // All replicas are down; it doesn't matter
+          // which one we pick
+          idx = random().nextInt(nodes.length);
+        }
 
-          if (nodes[idx] == null) {
-            // Start up Master from scratch:
-            TreeLogger.log("top: id=" + idx + " promote down node to master");
-            nodes[idx] = master = new Master(dirs[idx], idx, nodes, versionDocCounts);
-          } else {
-            // Promote a running replica to Master:
-            assert nodes[idx] instanceof Replica;
-            TreeLogger.log("top: id=" + idx + " promote replica to master");
-            master = new Master((Replica) nodes[idx], nodes);
-            nodes[idx] = master;
-          }
-        } finally {
-          masterLock.unlock();
+        if (nodes[idx] == null) {
+          // Start up Master from scratch:
+          TreeLogger.log("top: id=" + idx + " promote down node to master");
+          nodes[idx] = master = new Master(dirs[idx], idx, versionDocCounts);
+        } else {
+          // Promote a running replica to Master:
+          assert nodes[idx] instanceof Replica;
+          TreeLogger.log("top: id=" + idx + " promote replica to master");
+          master = new Master((Replica) nodes[idx]);
+          nodes[idx] = master;
+        }
+      } else {
+        if (random().nextInt(100) == 17) {
+          TreeLogger.log("top: id=" + master.id + " commit master");
+          master.commit();
         }
       }
-      TreeLogger.end("replicate");
-    }
 
-    System.out.println("TEST: stop commit thread");
-    commitThread.finish();
+      // Maybe restart a down replica, or commit / shutdown
+      // / crash one:
+      for(int i=0;i<nodes.length;i++) {
+        if (nodes[i] == null && random().nextInt(20) == 17) {
+          // Restart this replica
+          try {
+            nodes[i] = new Replica(dirs[i], i, versionDocCounts, null);
+          } catch (Throwable t) {
+            TreeLogger.log("top: id=" + i + " FAIL startup", t);
+            throw t;
+          }
+        } else {
+          if (nodes[i] instanceof Replica) {
+            Replica r = (Replica) nodes[i];
 
-    if (master != null) {
-      System.out.println("TEST: close master");
-      masterLock.lock();
-      try {
-        master.close(random().nextBoolean());
-        master.dir.close();
-      } finally {
-        masterLock.unlock();
+            if (random().nextInt(100) == 17) {
+              TreeLogger.log("top: id=" + i + " commit replica");
+              r.commit(false);
+            }
+
+            if (random().nextInt(100) == 17) {
+              // Now shutdown this replica
+              TreeLogger.log("top: id=" + i + " shutdown replica");
+              r.shutdown();
+              nodes[i] = null;
+              break;
+            } else if (random().nextInt(100) == 17) {
+              // Now crash the replica
+              TreeLogger.log("top: id=" + i + " crash replica");
+              r.crash();
+              nodes[i] = null;
+              break;
+            }
+          }
+        }
       }
+
+      TreeLogger.end("replicate");
     }
 
     System.out.println("TEST: close replicas");
@@ -451,14 +407,20 @@ public class TestNRTReplication extends 
         ((Replica) n).shutdown();
       }
     }
+
+    if (master != null) {
+      System.out.println("TEST: close master");
+      master.close(random().nextBoolean());
+      master.dir.close();
+    }
   }
 
   static class FileMetaData {
-    public final long sizeInBytes;
+    public final long length;
     public final long checksum;
 
-    public FileMetaData(long sizeInBytes, long checksum) {
-      this.sizeInBytes = sizeInBytes;
+    public FileMetaData(long length, long checksum) {
+      this.length = length;
       this.checksum = checksum;
     }
   }
@@ -466,47 +428,14 @@ public class TestNRTReplication extends 
   static Map<String,FileMetaData> getFilesMetaData(Master master, Collection<String> files) throws IOException {
     Map<String,FileMetaData> filesMetaData = new HashMap<String,FileMetaData>();
     for(String file : files) {
-      Long checksum = master.dir.getChecksum(file);
-      assert checksum != null;
-      filesMetaData.put(file, new FileMetaData(master.dir.fileLength(file), checksum.longValue()));
+      filesMetaData.put(file, new FileMetaData(master.dir.fileLength(file), master.dir.getChecksum(file)));
     }
 
     return filesMetaData;
   }
 
-  static Set<String> copyFiles(SlowChecksumDirectory src, Replica dst, Map<String,FileMetaData> filesMetaData, boolean lowPriority) throws IOException {
+  static Map<String,FileMetaData> copyFiles(SlowChecksumDirectory src, Replica dst, Map<String,FileMetaData> files, boolean lowPriority, long totBytes) throws IOException {
     long t0 = System.currentTimeMillis();
-    long totBytes = 0;
-    Set<String> toCopy = new HashSet<String>();
-    for(Map.Entry<String,FileMetaData> ent : filesMetaData.entrySet()) {
-      String fileName = ent.getKey();
-      // nocommit remove now unused metaData.checksum
-      FileMetaData metaData = ent.getValue();
-      Long srcChecksum0 = src.getChecksum(fileName);
-      assert srcChecksum0 != null: "id=" + dst.id + " name=" + fileName;
-      long srcChecksum = srcChecksum0.longValue();
-
-      Long checksum = dst.dir.getChecksum(fileName);
-      if (dst.dir.fileExists(fileName) == false) {
-        TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [does not exist] length=" + metaData.sizeInBytes + " srcChecksum=" + srcChecksum);
-        toCopy.add(fileName);
-        totBytes += metaData.sizeInBytes;
-      } else if (dst.dir.fileLength(fileName) != metaData.sizeInBytes) {
-        TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [different file length old=" + dst.dir.fileLength(fileName) + " new=" + metaData.sizeInBytes + "]");
-        toCopy.add(fileName);
-        totBytes += metaData.sizeInBytes;
-      } else if (checksum == null) {
-        TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [no checksum] length=" + metaData.sizeInBytes);
-        toCopy.add(fileName);
-        totBytes += metaData.sizeInBytes;
-      } else if (checksum.longValue() != srcChecksum) {
-        TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [wrong checksum: cur=" + checksum.longValue() + " new=" + srcChecksum + "] length=" + metaData.sizeInBytes);
-        toCopy.add(fileName);
-        totBytes += metaData.sizeInBytes;
-      } else {
-        TreeLogger.log("id=" + dst.id + " " + fileName + " skip copy checksum=" + srcChecksum + " file.length=" + metaData.sizeInBytes);
-      }
-    }
 
     // nocommit should we "organize" the files to be copied
     // by segment name?  so that NRTCachingDir can
@@ -520,47 +449,32 @@ public class TestNRTReplication extends 
       // nocommit can we get numDocs?
       ioContext = new IOContext(new FlushInfo(0, totBytes));
     }
-    for(String f : toCopy) {
-      long bytes = src.fileLength(f);
-      //System.out.println("  copy " + f + " (" + bytes + " bytes)");
-      totBytes += bytes;
-      TreeLogger.log("id=" + dst.id + " " + f + " copy file");
-
-      dst.copyOneFile(dst.id, src, f, ioContext);
 
-      // nocommit make test that exercises this
-      // Make sure no bits flipped during copy
-      Long v1 = dst.dir.getChecksum(f);
-      assert v1 != null;
+    CopyResult copyResult = dst.fileCopier.add(src, files, lowPriority, ioContext);
 
-      Long v2 = src.getChecksum(f);
-      assert v2 != null;
-
-      if (v1.longValue() != v2.longValue()) {
-        throw new IOException("id=" + dst.id + " " + f + ": copy failed: wrong checksums src=" + v2 + " vs dst=" + v1);
-      }
+    try {
+      copyResult.done.await();
+    } catch (InterruptedException ie) {
+      // nocommit
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(ie);
+    }
 
-      if (lowPriority) {
-        // Rate limit low priority (copying a merged segment):
-        // nocommit use rate limiter
-        try {
-          Thread.sleep(bytes/100000);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(ie);
-        }
-      }
+    if (copyResult.failed.get()) {
+      TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + " failed to copy some files");
+      return null;
     }
+    
     long t1 = System.currentTimeMillis();
-    TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + "took " + (t1-t0) + " millis for " + totBytes + " bytes; " + toCopy.size() + " files (of " + filesMetaData.size() + "); sizeInBytes=" + ((NRTCachingDirectory) dst.dir.getDelegate()).sizeInBytes());
+    TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + "took " + (t1-t0) + " millis for " + totBytes + " bytes; " + files.size() + " files; nrtDir.sizeInBytes=" + ((NRTCachingDirectory) dst.dir.getDelegate()).sizeInBytes());
 
-    return toCopy;
+    return files;
   }
 
-  /** Like SearcherManager, except it refreshes via a
-   *  provided (NRT) SegmentInfos. */
+  /** Like SearcherManager, except it refreshes via an
+   *  externally provided (NRT) SegmentInfos. */
   private static class InfosSearcherManager extends ReferenceManager<IndexSearcher> {
-    private SegmentInfos currentInfos;
+    private volatile SegmentInfos currentInfos;
     private final Directory dir;
 
     public InfosSearcherManager(Directory dir, int id, SegmentInfos infosIn) throws IOException {
@@ -598,17 +512,19 @@ public class TestNRTReplication extends 
     public SegmentInfos getCurrentInfos() {
       return currentInfos;
     }
-    
-    public void refresh(SegmentInfos infos) throws IOException {
+
+    public void setCurrentInfos(SegmentInfos infos) {
       if (currentInfos != null) {
         // So that if we commit, we will go to the next
         // (unwritten so far) generation:
         infos.updateGeneration(currentInfos);
+        TreeLogger.log("mgr.setCurrentInfos: carry over infos gen=" + infos.getSegmentsFileName());
       }
       currentInfos = infos;
-      maybeRefresh();
     }
 
+    // nocommit who manages the "ref" for currentInfos?
+
     @Override
     protected IndexSearcher refreshIfNeeded(IndexSearcher old) throws IOException {
       List<AtomicReader> subs;
@@ -625,75 +541,19 @@ public class TestNRTReplication extends 
     }
   }
 
-  static class CommitThread extends Thread {
-    private final File[] dirs;
-    private final Object[] nodes;
-    private final Map<Long,Integer> versionDocCounts;
-    private volatile boolean stop;
-
-    public CommitThread(File[] dirs, Object[] nodes, Map<Long,Integer> versionDocCounts) {
-      this.dirs = dirs;
-      this.nodes = nodes;
-      this.versionDocCounts = versionDocCounts;
-    }
+  private static class CopyState {
+    public final SlowChecksumDirectory dir;
+    public final Map<String,FileMetaData> files;
+    public final long version;
+    public final byte[] infosBytes;
+    public final SegmentInfos infos;
 
-    @Override
-    public void run() {
-      TreeLogger.setLogger(new TreeLogger("commit"));
-      try {
-        while (stop == false) {
-          Thread.sleep(_TestUtil.nextInt(random(), 10, 30));
-          int i = random().nextInt(nodes.length);
-
-          masterLock.lock();
-          try {
-            Object n = nodes[i];
-            if (n != null) {
-              if (n instanceof Replica) {
-                Replica r = (Replica) n;
-                if (random().nextInt(100) == 17) {
-                  TreeLogger.log("top: id=" + i + " commit");
-                  r.commit(false);
-                }
-                if (random().nextInt(100) == 17) {
-                  // Shutdown this replica
-                  nodes[i] = null;
-                  TreeLogger.log("top: id=" + i + " shutdown replica");
-                  r.shutdown();
-                } else if (random().nextInt(100) == 17) {
-                  // Crash the replica
-                  nodes[i] = null;
-                  TreeLogger.log("top: id=" + i + " crash replica");
-                  r.crash();
-                }
-              } else if (master != null && master.isClosed() == false) {
-                // Randomly commit master:
-                if (random().nextInt(100) == 17) {
-                  TreeLogger.log("top: id=" + i + " commit master");
-                  master.commit();
-                }
-              }
-            } else if (random().nextInt(20) == 17) {
-              // Restart this replica
-              try {
-                nodes[i] = new Replica(dirs[i], i, versionDocCounts, null);
-              } catch (Throwable t) {
-                TreeLogger.log("top: id=" + i + " FAIL startup", t);
-                throw t;
-              }
-            }
-          } finally {
-            masterLock.unlock();
-          }
-        }
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void finish() throws InterruptedException {
-      stop = true;
-      join();
+    public CopyState(SlowChecksumDirectory dir, Map<String,FileMetaData> files, long version, byte[] infosBytes, SegmentInfos infos) {
+      this.dir = dir;
+      this.files = files;
+      this.version = version;
+      this.infosBytes = infosBytes;
+      this.infos = infos;
     }
   }
 
@@ -703,31 +563,34 @@ public class TestNRTReplication extends 
     final Set<String> finishedMergedSegments = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>());
 
     final SlowChecksumDirectory dir;
-    final Object[] nodes;
 
     final RandomIndexWriter writer;
     final InfosSearcherManager mgr;
     final SearchThread searchThread;
     final IndexThread indexThread;
     final int id;
-    private boolean isClosed;
+    private volatile boolean isClosed;
+    private AtomicInteger infosRefCount = new AtomicInteger();
 
     SegmentInfos lastInfos;
 
     /** Start up a master from scratch. */
     public Master(File path,
-                  int id, Object[] nodes, Map<Long,Integer> versionDocCounts) throws IOException {
+                  int id, Map<Long,Integer> versionDocCounts) throws IOException {
       final MockDirectoryWrapper dirOrig = newMockFSDirectory(path);
 
       // In some legitimate cases we will double-write:
       dirOrig.setPreventDoubleWrite(false);
 
       this.id = id;
-      this.nodes = nodes;
 
       // nocommit put back
       dirOrig.setCheckIndexOnClose(false);
 
+      // Master may legitimately close while replicas are
+      // still copying from it:
+      dirOrig.setAllowCloseWithOpenFiles(true);
+
       dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(dirOrig, 1.0, 10.0));
       //((NRTCachingDirectory) master).VERBOSE = true;
       SegmentInfos infos = new SegmentInfos();
@@ -736,8 +599,6 @@ public class TestNRTReplication extends 
       } catch (IndexNotFoundException infe) {
       }
 
-      //pullGlobalState(infos);
-
       mgr = new InfosSearcherManager(dir, id, infos);
       searchThread = new SearchThread("master", mgr, versionDocCounts);
       searchThread.start();
@@ -758,6 +619,7 @@ public class TestNRTReplication extends 
       // nocommit thread hazard here?  IW could have already
       // nuked some segments...?
       writer.w.incRefDeleter(lastInfos);
+      setCopyState();
 
       indexThread = new IndexThread(this);
       indexThread.start();
@@ -765,7 +627,6 @@ public class TestNRTReplication extends 
 
     public void commit() throws IOException {
       writer.w.prepareCommit();
-      //pushGlobalState(writer.w.getPendingCommit());
       // It's harmless if we crash here, because the
       // global state has already been updated
       writer.w.commit();
@@ -774,13 +635,25 @@ public class TestNRTReplication extends 
     /** Promotes an existing Replica to Master, re-using the
      *  open NRTCachingDir, the SearcherManager, the search
      *  thread, etc. */
-    public Master(Replica replica, Object[] nodes) throws IOException {
+    public Master(Replica replica) throws IOException {
       this.id = replica.id;
       this.dir = replica.dir;
-      this.nodes = nodes;
       this.mgr = replica.mgr;
       this.searchThread = replica.searchThread;
 
+      // Master may legitimately close while replicas are
+      // still copying from it:
+      ((MockDirectoryWrapper) ((NRTCachingDirectory) dir.getDelegate()).getDelegate()).setAllowCloseWithOpenFiles(true);
+
+      // Do not copy from ourself:
+      try {
+        replica.copyThread.finish();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+      replica.fileCopier.close();
+
       // nocommit must somehow "stop" this replica?  e.g. we
       // don't want it doing any more deleting?
 
@@ -790,15 +663,19 @@ public class TestNRTReplication extends 
       // nocommit also test periodically committing, and
       // preserving multiple commit points; verify these
       // "survive" over to the replica
+      SegmentInfos infos = replica.mgr.getCurrentInfos().clone();
+      TreeLogger.log("top: id=" + replica.id + " sis version=" + infos.version);
 
-      SegmentInfos curInfos = replica.mgr.getCurrentInfos().clone();
-      //pullGlobalState(curInfos);
-
-      writer = new RandomIndexWriter(random(), dir, curInfos, iwc);
+      writer = new RandomIndexWriter(random(), dir, replica.mgr.getCurrentInfos().clone(), iwc);
       _TestUtil.reduceOpenFiles(writer.w);
       System.out.println("after reduce: " + writer.w.getConfig());
 
       lastInfos = mgr.getCurrentInfos();
+
+      writer.w.incRefDeleter(lastInfos);
+
+      setCopyState();
+
       // nocommit thread hazard here?  IW could have already
       // nuked some segments...?
       writer.w.incRefDeleter(lastInfos);
@@ -823,12 +700,40 @@ public class TestNRTReplication extends 
               TreeLogger.setLogger(new TreeLogger("merge"));
             }
             //System.out.println("TEST: warm merged segment files " + info);
-            Map<String,FileMetaData> filesMetaData = getFilesMetaData(Master.this, info.files());
+            Map<String,FileMetaData> toCopy = getFilesMetaData(Master.this, info.files());
+            long totBytes = 0;
+            for(FileMetaData metaData : toCopy.values()) {
+              totBytes += metaData.length;
+            }
+            TreeLogger.log("warm files=" + toCopy.keySet() + " totBytes=" + totBytes);
+            TreeLogger.start("warmMerge");
+
+            IOContext context = new IOContext(new MergeInfo(0, totBytes, false, 0));
+
+            List<CopyResult> copyResults = new ArrayList<>();
             for(Object n : nodes) {
               if (n != null && n instanceof Replica) {
+                Replica r = (Replica) n;
+                
                 try {
-                  // nocommit do we need to check for merge aborted...?
-                  ((Replica) n).warmMerge(info.info.name, dir, filesMetaData);
+
+                  // nocommit we could also have replica pre-warm a SegmentReader
+                  // here, and add it onto subReader list
+                  // for next reopen ...?
+
+                  // Must call filter, in case we are
+                  // overwriting files and must invalidate
+                  // the last commit:
+                  int sizeBefore = toCopy.size();
+                  Map<String,FileMetaData> toCopy2 = r.filterFilesToCopy(toCopy);
+
+                  // Since this is a newly merged segment,
+                  // all files should be new and need
+                  // copying:
+                  assert toCopy.size() == sizeBefore: "before=" + toCopy.keySet() + " after=" + toCopy2.keySet();
+
+                  TreeLogger.log("copy to replica " + r.id);
+                  copyResults.add(r.copyMergedFiles(Master.this.dir, toCopy2, context));
                 } catch (AlreadyClosedException ace) {
                   // Ignore this: it means the replica shut down
                   // while we were trying to copy files.  This
@@ -838,15 +743,32 @@ public class TestNRTReplication extends 
                 }
               }
             }
+
+            TreeLogger.log("now wait for " + copyResults.size() + " replicas to finish copying");
+            for(CopyResult result : copyResults) {
+              try {
+                result.done.await();
+              } catch (InterruptedException ie) {
+                // nocommit
+              }
+
+              // nocommit if there's an error ... what to
+              // do?
+
+              // nocommit we should check merge.abort
+              // somehow in here, so if the master is in
+              // a hurry to shutdown, we respect that
+            }
+            TreeLogger.log("done warm merge for " + copyResults.size() + " replicas");
+            TreeLogger.end("warmMerge");
           }
         });
 
       return iwc;
     }
 
-    /** Gracefully shuts down the master, and returns the
-     *  final segments in the index .*/
-    public SegmentInfos close(boolean waitForMerges) throws IOException {
+    /** Gracefully shuts down the master */
+    public void close(boolean waitForMerges) throws IOException {
       TreeLogger.log("id=" + id + " close master waitForMerges=" + waitForMerges);
 
       try {
@@ -856,8 +778,6 @@ public class TestNRTReplication extends 
         throw new RuntimeException(ie);
       }
 
-      mgr.close();
-
       if (waitForMerges) {
         // Do it here, instead of on close, so we can
         // continue indexing while waiting for merges:
@@ -865,10 +785,7 @@ public class TestNRTReplication extends 
         TreeLogger.log("waitForMerges done");
       }
 
-      if (lastInfos != null) {
-        writer.w.decRefDeleter(lastInfos);
-        lastInfos = null;
-      }
+      mgr.close();
 
       try {
         indexThread.finish();
@@ -884,39 +801,100 @@ public class TestNRTReplication extends 
 
       commit();
 
-      // Don't wait for merges now; we already did above:
-      writer.close(false);
-      TreeLogger.log("done close writer");
+      synchronized(this) {
+        isClosed = true;
+        if (lastInfos != null) {
+          writer.w.decRefDeleter(lastInfos);
+          lastInfos = null;
+          copyState = null;
+        }
 
-      SegmentInfos infos = new SegmentInfos();
-      infos.read(dir);
+        // nocommit what about exc from writer.close...
+        // Don't wait for merges now; we already did above:
+        writer.close(false);
+        TreeLogger.log("done close writer");
 
-      TreeLogger.log("final infos=" + infos.toString(master.dir));
+        SegmentInfos infos = new SegmentInfos();
+        infos.read(dir);
+
+        TreeLogger.log("final infos=" + infos.toString(master.dir));
 
-      // nocommit caller must close
-      // dir.close();
-      isClosed = true;
+        // nocommit caller must close
+        // dir.close();
 
-      return infos;
+        lastInfos = infos;
+        setCopyState();
+      }
     }
 
     public synchronized boolean isClosed() {
       return isClosed;
     }
 
-    public synchronized SegmentInfos getInfos() throws IOException {
-      writer.w.incRefDeleter(lastInfos);
-      return lastInfos;
+    private CopyState copyState;
+
+    private synchronized void setCopyState() throws IOException {
+      RAMOutputStream out = new RAMOutputStream();
+      lastInfos.write(out);
+      byte[] infosBytes = new byte[(int) out.getFilePointer()];
+      out.writeTo(infosBytes, 0);
+      copyState = new CopyState(dir, 
+                                getFilesMetaData(this, lastInfos.files(dir, false)),
+                                lastInfos.version, infosBytes, lastInfos);
+    }
+
+    public synchronized CopyState getCopyState() throws IOException {
+      //if (isClosed && lastInfos == null) {
+      //return null;
+      //}
+      TreeLogger.log("master.getCopyState version=" + lastInfos.version + " files=" + lastInfos.files(dir, false) + " writer=" + writer);
+      if (isClosed == false) {
+        writer.w.incRefDeleter(copyState.infos);
+      }
+      int count = infosRefCount.incrementAndGet();
+      assert count >= 1;
+      return copyState;
+    }
+
+    /** Flushes, returns a ref with the resulting infos. */
+    public void flush() throws IOException {
+      SegmentInfos newInfos = master.writer.w.flushAndIncRef();
+
+      synchronized(this) {
+        writer.w.decRefDeleter(lastInfos);
+        // Steals the reference returned by IW:
+        lastInfos = newInfos;
+        setCopyState();
+      }
     }
 
-    // NOTE: steals incoming ref
-    public synchronized void setInfos(SegmentInfos newInfos) throws IOException {
-      writer.w.decRefDeleter(lastInfos);
-      lastInfos = newInfos;
+    public synchronized void releaseCopyState(CopyState copyState) throws IOException {
+      TreeLogger.log("master.releaseCopyState version=" + copyState.version + " files=" + copyState.files.keySet());
+      // Must check because by the time the replica releases
+      // it's possible it's a different master:
+      if (copyState.dir == dir) {
+        if (isClosed == false) {
+          writer.w.decRefDeleter(copyState.infos);
+        }
+        int count = infosRefCount.decrementAndGet();
+        assert count >= 0;
+        TreeLogger.log("  infosRefCount=" + infosRefCount);
+        if (count == 0) {
+          notify();
+        }
+      } else {
+        TreeLogger.log("  skip: wrong master");
+      }
     }
 
-    public synchronized void releaseInfos(SegmentInfos infos) throws IOException {
-      writer.w.decRefDeleter(infos);
+    /** Waits until all outstanding infos refs are dropped. */
+    public synchronized void waitIdle() throws InterruptedException {
+      while (true) {
+        if (infosRefCount.get() == 0) {
+          return;
+        }
+        wait();
+      }
     }
   }
 
@@ -959,7 +937,7 @@ public class TestNRTReplication extends 
       stop = true;
       join();
     }
-  };
+  }
 
   private static class Replica {
     final int id;
@@ -968,7 +946,9 @@ public class TestNRTReplication extends 
 
     private final InfosSearcherManager mgr;
     private volatile boolean stop;
-    private SearchThread searchThread;
+    final SearchThread searchThread;
+    final SimpleFileCopier fileCopier;
+    CopyThread copyThread;
 
     private final Collection<String> lastCommitFiles;
     private final Collection<String> lastNRTFiles;
@@ -994,210 +974,260 @@ public class TestNRTReplication extends 
       dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(fsDir, 1.0, 10.0));
       TreeLogger.log("id=" + id + " created dirs, checksums; dir.listAll=" + Arrays.toString(dir.listAll()));
 
-      // nocommit don't need copiedFiles anymore:
-      // Startup sync to pull latest index over:
-      Set<String> copiedFiles = null;
+      fileCopier = new SimpleFileCopier(dir, id);
+      fileCopier.start();
 
       lastCommitFiles = new HashSet<String>();
       lastNRTFiles = new HashSet<String>();
 
+      deleter = new InfosRefCounts(id, dir);
+
       String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(dir);
+      SegmentInfos lastCommit = null;
       if (segmentsFileName != null) {
-        SegmentInfos lastCommit = new SegmentInfos();
+        lastCommit = new SegmentInfos();
         TreeLogger.log("id=" + id + " " + segmentsFileName + " now load");
         lastCommit.read(dir, segmentsFileName);
         lastCommitFiles.addAll(lastCommit.files(dir, true));
-        TreeLogger.log("id=" + id + " lastCommitFiles = " + lastCommitFiles);
+        TreeLogger.log("id=" + id + " incRef lastCommitFiles");
+        deleter.incRef(lastCommitFiles);
+        TreeLogger.log("id=" + id + " loaded version=" + lastCommit.version + " lastCommitFiles = " + lastCommitFiles);
       }
 
       // Must sync latest index from master before starting
       // up mgr, so that we don't hold open any files that
       // need to be overwritten when the master is against
-      // an older index than our copy:
+      // an older index than our copy, and so we rollback
+      // our version if we had been at a higher version but
+      // were down when master moved:
       SegmentInfos infos = null;
-      assert master == null || masterLock.isLocked();
-      if (master != null && master.isClosed() == false) {
-        SegmentInfos masterInfos = null;
-        try {
-          masterInfos = master.getInfos();
-          // Convert infos to byte[], to send "on the wire":
-          RAMOutputStream out = new RAMOutputStream();
-          masterInfos.write(out);
-          byte[] infosBytes = new byte[(int) out.getFilePointer()];
-          out.writeTo(infosBytes, 0);
 
-          Map<String,FileMetaData> filesMetaData = getFilesMetaData(master, masterInfos.files(master.dir, false));
+      // Startup sync to pull latest index over:
+      Map<String,FileMetaData> copiedFiles = null;
+
+      if (master != null) {
+
+        // nocommit this logic isn't right; else, on a full
+        // restart how will a master be found:
 
+        // Repeat until we find a working master; this is to
+        // handle the case when a replica starts up but no
+        // new master has yet been selected when moving
+        // master:
+        while (true) {
+          Master curMaster = master;
+          CopyState copyState = curMaster.getCopyState();
           try {
+            // nocommit factor this out & share w/ CopyThread:
+
+            deleter.incRef(copyState.files.keySet());
+
+            Map<String,FileMetaData> toCopy = filterFilesToCopy(copyState.files);
+            long totBytes = 0;
+            for(FileMetaData metaData : toCopy.values()) {
+              totBytes += metaData.length;
+            }
+
             // Copy files over to replica:
-            copiedFiles = copyFiles(master.dir, this, filesMetaData, false);
-          } catch (Throwable t) {
-            TreeLogger.log("id=" + id + " FAIL", t);
-            throw new RuntimeException(t);
-          }
+            copiedFiles = copyFiles(copyState.dir, this, toCopy, false, totBytes);
+            if (copiedFiles == null) {
+              TreeLogger.log("id=" + id + " startup copyFiles failed");
+              deleter.decRef(copyState.files.keySet());
+              try {
+                Thread.sleep(5);
+              } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+              }
+              continue;
+            }
       
-          // Turn byte[] back to SegmentInfos:
-          infos = new SegmentInfos();
-          infos.read(dir, new ByteArrayDataInput(infosBytes));
-          lastNRTFiles.addAll(infos.files(dir, false));
-        } finally {
-          if (masterInfos != null) {
-            master.releaseInfos(masterInfos);
+            // Turn byte[] back to SegmentInfos:
+            infos = new SegmentInfos();
+            infos.read(dir, new ByteArrayDataInput(copyState.infosBytes));
+            lastNRTFiles.addAll(copyState.files.keySet());
+          } finally {
+            curMaster.releaseCopyState(copyState);
           }
+          break;
         }
       } else {
         TreeLogger.log("id=" + id + " no master on startup; fallback to last commit");
       }
 
-      if (copiedFiles != null) {
-        // nocommit factor out & share this invalidation
-
-        // nocommit we need to do this invalidation BEFORE
-        // actually ovewriting the file?  because if we crash
-        // in between the two, we've left an invalid commit?
-
-        // If any of the files we just copied over
-        // were referenced by the last commit,
-        // then we must remove this commit point (it is now
-        // corrupt):
-        for(String fileName : copiedFiles) {
-          if (lastCommitFiles.contains(fileName)) {
-            TreeLogger.log("id=" + id + " " + segmentsFileName + " delete now corrupt commit and clear lastCommitFiles");
-            dir.deleteFile(segmentsFileName);
-            dir.deleteFile("segments.gen");
-            lastCommitFiles.clear();
-            break;
-          }
-        }
+      if (lastCommit != null) {
+        infos.updateGeneration(lastCommit);
       }
 
       mgr = new InfosSearcherManager(dir, id, infos);
 
-      deleter = new InfosRefCounts(id, dir);
-      TreeLogger.log("id=" + id + " incRef lastCommitFiles");
-      deleter.incRef(lastCommitFiles);
-      TreeLogger.log("id=" + id + ": incRef lastNRTFiles=" + lastNRTFiles);
-      deleter.incRef(lastNRTFiles);
       deleter.deleteUnknownFiles();
 
       searchThread = new SearchThread(""+id, mgr, versionDocCounts);
       searchThread.start();
-      pruneChecksums();
+
+      copyThread = new CopyThread(this);
+      copyThread.start();
+
       TreeLogger.log("done startup");
       TreeLogger.end("startup");
     }
 
-    private void pruneChecksums() {
-      TreeLogger.log("id=" + id + " now pruneChecksums");
-      TreeLogger.start("pruneChecksums");
-      Set<String> validFiles = new HashSet<String>();
-      validFiles.addAll(lastNRTFiles);
-      validFiles.addAll(lastCommitFiles);
-      TreeLogger.end("pruneChecksums");
+    /** From the incoming files, determines which ones need
+     *  copying on this replica, because they weren't yet
+     *  copied, or they were previously copied but the
+     *  checksum or file length is different. */
+    public Map<String,FileMetaData> filterFilesToCopy(Map<String,FileMetaData> files) throws IOException {
+      Map<String,FileMetaData> toCopy = new HashMap<>();
+
+      for(Map.Entry<String,FileMetaData> ent : files.entrySet()) {
+        String fileName = ent.getKey();
+        FileMetaData metaData = ent.getValue();
+        long srcChecksum = ent.getValue().checksum;
+
+        Long checksum = dir.getChecksum(fileName);
+
+        long curLength;
+        try {
+          curLength = dir.fileLength(fileName);
+        } catch (FileNotFoundException | NoSuchFileException e) {
+          curLength = -1;
+        }
+        if (curLength != metaData.length) {
+          TreeLogger.log("id=" + id + " " + fileName + " will copy [wrong file length old=" + curLength + " new=" + metaData.length + "]");
+          toCopy.put(fileName, metaData);
+        } else if (checksum == null) {
+          TreeLogger.log("id=" + id + " " + fileName + " will copy [no checksum] length=" + metaData.length + " srcChecksum=" + srcChecksum);
+          toCopy.put(fileName, metaData);
+        } else if (checksum.longValue() != srcChecksum) {
+          TreeLogger.log("id=" + id + " " + fileName + " will copy [wrong checksum: cur=" + checksum.longValue() + " new=" + srcChecksum + "] length=" + metaData.length);
+          toCopy.put(fileName, metaData);
+        } else {
+          TreeLogger.log("id=" + id + " " + fileName + " skip copy checksum=" + srcChecksum + " file.length=" + metaData.length);
+        }
+      }
+
+      // nocommit what about multiple commit points?
+
+      // Invalidate the current commit if we will overwrite
+      // any files from it:
+
+      // nocommit do we need sync'd here?
+      for(String fileName : toCopy.keySet()) {
+        if (lastCommitFiles.contains(fileName)) {
+          TreeLogger.log("id=" + id + " " + fileName + " is being copied but is referenced by last commit; now drop last commit; lastCommitFiles=" + lastCommitFiles);
+          deleter.decRef(lastCommitFiles);
+          dir.deleteFile("segments.gen");
+          lastCommitFiles.clear();
+          break;
+        }
+      }
+
+      return toCopy;
     }
 
-    public void copyOneFile(int id, Directory src, String fileName, IOContext context) throws IOException {
+    public void newFlush() {
+      copyThread.lock.lock();
       try {
-        src.copy(dir, fileName, fileName, context);
-      } catch (IOException ioe) {
-        TreeLogger.log("id=" + id + " " + fileName + " failed copy1", ioe);
-        throw ioe;
+        copyThread.cond.signal();
+      } finally {
+        copyThread.lock.unlock();
       }
     }
 
-    // nocommit move this to a thread so N replicas copy at
-    // once:
-
-    public synchronized Set<String> sync(SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData, byte[] infosBytes,
-                                         long infosVersion) throws IOException {
-
+    public CopyResult copyMergedFiles(SlowChecksumDirectory src, Map<String,FileMetaData> files, IOContext context) throws IOException {
       if (stop) {
         throw new AlreadyClosedException("replica closed");
       }
+      return fileCopier.add(src, files, true, context);
+    }
+
+    void sync(int curMasterMoveCount, SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData, byte[] infosBytes,
+              long infosVersion) throws IOException {
 
       SegmentInfos currentInfos = mgr.getCurrentInfos();
       TreeLogger.log("id=" + id + " sync version=" + infosVersion + " vs current version=" + currentInfos.getVersion());
       TreeLogger.start("sync");
 
+      // nocommit make overall test "modal", ie up front
+      // decide whether any docs are allowed to be lost
+      // (version goes backwards on replicas) or not
+ 
       /*
-      if (currentInfos != null && currentInfos.getVersion() >= infosVersion) {
+        if (currentInfos != null && currentInfos.getVersion() >= infosVersion) {
         System.out.println("  replica id=" + id + ": skip sync current version=" + currentInfos.getVersion() + " vs new version=" + infosVersion);
         return;
-      }
+        }
       */
 
+      // Must incRef before filter in case filter decRefs
+      // the last commit:
+      deleter.incRef(filesMetaData.keySet());
+
+      Map<String,FileMetaData> toCopy = filterFilesToCopy(filesMetaData);
+      long totBytes = 0;
+      for(FileMetaData metaData : toCopy.values()) {
+        totBytes += metaData.length;
+      }
+
       // Copy files over to replica:
-      Set<String> copiedFiles = copyFiles(master, this, filesMetaData, false);
+      Map<String,FileMetaData> copiedFiles = copyFiles(master, this, toCopy, false, totBytes);
+
+      if (copiedFiles == null) {
+        // At least one file failed to copy; skip cutover
+        TreeLogger.log("top: id=" + id + " replica sync failed; abort");
+        deleter.decRef(filesMetaData.keySet());
+        TreeLogger.end("sync");
+        return;
+      }
+
+      TreeLogger.log("top: id=" + id + " replica sync done file copy");
+
+      // OK all files copied successfully, now rebuild the
+      // infos and cutover searcher mgr
 
       // Turn byte[] back to SegmentInfos:
       SegmentInfos infos = new SegmentInfos();
       infos.read(dir, new ByteArrayDataInput(infosBytes));
       TreeLogger.log("id=" + id + " replica sync version=" + infos.version + " segments=" + infos.toString(dir));
 
-      // Delete now un-referenced files:
-      Collection<String> newFiles = infos.files(dir, false);
-      deleter.incRef(newFiles);
-      deleter.decRef(lastNRTFiles);
-      lastNRTFiles.clear();
-      lastNRTFiles.addAll(newFiles);
-
-      // nocommit factor out & share this invalidation
-
-      // nocommit we need to do this invalidation BEFORE
-      // actually ovewriting the file?  because if we crash
-      // in between the two, we've left an invalid commit?
+      masterLock.lock();
+      try {
+        if (curMasterMoveCount != masterCount.get()) {
+          // At least one file failed to copy; skip cutover
+          TreeLogger.log("top: id=" + id + " master moved during sync; abort");
+          deleter.decRef(filesMetaData.keySet());
+          TreeLogger.end("sync");
+          return;
+        }
 
-      // Invalidate the current commit if we overwrote any
-      // files from it:
-      for(String fileName : copiedFiles) {
-        if (lastCommitFiles.contains(fileName)) {
-          TreeLogger.log("id=" + id + " delete now corrupt commit and clear lastCommitFiles=" + lastCommitFiles);
-          deleter.decRef(lastCommitFiles);
-          dir.deleteFile("segments.gen");
-          lastCommitFiles.clear();
-          break;
+        synchronized (this) {
+          mgr.setCurrentInfos(infos);
+          deleter.decRef(lastNRTFiles);
+          lastNRTFiles.clear();
+          lastNRTFiles.addAll(filesMetaData.keySet());
         }
+      } finally {
+        masterLock.unlock();
       }
-      
+
       // Cutover to new searcher
-      mgr.refresh(infos);
+      mgr.maybeRefresh();
 
+      TreeLogger.log("top: id=" + id + " done mgr refresh");
+      
       TreeLogger.end("sync");
-      return copiedFiles;
     }
 
-    public synchronized void warmMerge(String segmentName, SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData) throws IOException {
+    /** Gracefully close & shutdown this replica. */
+    public void shutdown() throws IOException, InterruptedException {
       if (stop) {
-        throw new AlreadyClosedException("replica closed");
-      }
-      TreeLogger.log("id=" + id + " replica warm merge " + segmentName);
-      Set<String> copiedFiles = copyFiles(master, this, filesMetaData, true);
-
-      // nocommit factor out & share this invalidation
-
-      // nocommit we need to do this invalidation BEFORE
-      // actually ovewriting the file?  because if we crash
-      // in between the two, we've left an invalid commit?
-
-      // Invalidate the current commit if we overwrote any
-      // files from it:
-      for(String fileName : copiedFiles) {
-        if (lastCommitFiles.contains(fileName)) {
-          TreeLogger.log("id=" + id + " delete now corrupt commit and clear lastCommitFiles=" + lastCommitFiles);
-          deleter.decRef(lastCommitFiles);
-          lastCommitFiles.clear();
-          dir.deleteFile("segments.gen");
-          break;
-        }
+        return;
       }
+      stop = true;
 
-      // nocommit we could also pre-warm a SegmentReader
-      // here, and add it onto subReader list for next reopen ...?
-    }
+      copyThread.finish();
+      fileCopier.close();
 
-    /** Gracefully close & shutdown this replica. */
-    public synchronized void shutdown() throws IOException, InterruptedException {
-      stop = true;
       // Sometimes shutdown w/o commiting
       TreeLogger.log("id=" + id + " replica shutdown");
       TreeLogger.start("shutdown");
@@ -1221,10 +1251,15 @@ public class TestNRTReplication extends 
 
     /** Crashes the underlying directory, corrupting any
      *  un-sync'd files. */
-    public synchronized void crash() throws IOException, InterruptedException {
+    public void crash() throws IOException, InterruptedException {
+      if (stop) {
+        return;
+      }
       stop = true;
       TreeLogger.log("id=" + id + " replica crash; dir.listAll()=" + Arrays.toString(dir.listAll()));
       TreeLogger.log("id=" + id + " replica crash; fsdir.listAll()=" + Arrays.toString(((NRTCachingDirectory) dir.getDelegate()).getDelegate().listAll()));
+      copyThread.finish();
+      fileCopier.close();
       searchThread.finish();
       mgr.close();
       ((MockDirectoryWrapper) ((NRTCachingDirectory) dir.getDelegate()).getDelegate()).crash();
@@ -1233,24 +1268,45 @@ public class TestNRTReplication extends 
 
     /** Commit latest SegmentInfos (fsync'ing all referenced
      *  files). */
-    public synchronized void commit(boolean deleteAll) throws IOException {
-      SegmentInfos infos = mgr.getCurrentInfos();
+    public void commit(boolean deleteAll) throws IOException {
+      SegmentInfos infos;
+      Collection<String> newFiles;
+
+      synchronized (this) {
+        infos = mgr.getCurrentInfos();
+        if (infos != null) {
+          newFiles = infos.files(dir, false);
+          deleter.incRef(newFiles);
+        } else {
+          // nocommit is infos ever null?
+          newFiles = null;
+        }
+      }
+
       if (infos != null) {
-        TreeLogger.log("id=" + id + " commit deleteAll=" + deleteAll + "; infos.version=" + infos.getVersion() + " files=" + infos.files(dir, false) + " segs=" + infos.toString(dir));
+        TreeLogger.log("top: id=" + id + " commit deleteAll=" + deleteAll + "; infos.version=" + infos.getVersion() + " infos.files=" + newFiles + " segs=" + infos.toString(dir));
         TreeLogger.start("commit");
-        Set<String> fileNames = new HashSet<String>(infos.files(dir, false));
-        dir.sync(fileNames);
-        // Can only save those files that have been
-        // explicitly sync'd; a warmed, but not yet visible,
-        // merge cannot be sync'd:
+        dir.sync(newFiles);
         infos.commit(dir);
-        TreeLogger.log("id=" + id + " " + infos.getSegmentsFileName() + " committed");
 
-        Collection<String> newFiles = infos.files(dir, true);
-        deleter.incRef(newFiles);
+        String segmentsFileName = infos.getSegmentsFileName();
+        deleter.incRef(Collections.singletonList(segmentsFileName));
+
+        synchronized (this) {
+          // If a sync happened while we were committing, we
+          // must carry over the commit gen:
+          SegmentInfos curInfos = mgr.getCurrentInfos();
+          if (curInfos != infos) {
+            curInfos.updateGeneration(infos);
+          }
+        }
+
+        TreeLogger.log("top: id=" + id + " " + segmentsFileName + " committed; now decRef lastCommitFiles=" + lastCommitFiles);
+
         deleter.decRef(lastCommitFiles);
         lastCommitFiles.clear();
         lastCommitFiles.addAll(newFiles);
+        lastCommitFiles.add(segmentsFileName);
         TreeLogger.log("id=" + id + " " + infos.getSegmentsFileName() + " lastCommitFiles=" + lastCommitFiles);
 
         if (deleteAll) {
@@ -1258,14 +1314,407 @@ public class TestNRTReplication extends 
           // that just copied over as we closed the writer:
           TreeLogger.log("id=" + id + " now deleteUnknownFiles during commit");
           deleter.deleteUnknownFiles();
+          TreeLogger.log("id=" + id + " done deleteUnknownFiles during commit");
         }
 
-        pruneChecksums();
         TreeLogger.end("commit");
       }
     }
   }
 
+  static class CopyFileJob {
+    public final CopyResult result;
+    public final Directory src;
+    public final FileMetaData metaData;
+    public final String fileName;
+    public final IOContext context;
+
+    public CopyFileJob(CopyResult result, IOContext context, Directory src, String fileName, FileMetaData metaData) {
+      this.result = result;
+      this.context = context;
+      this.src = src;
+      this.fileName = fileName;
+      this.metaData = metaData;
+    }
+  }
+
+  static class CopyOneFile implements Closeable {
+    private long bytesLeft;
+
+    // TODO: reuse...?
+    private final byte[] buffer = new byte[65536];
+
+    private final IndexInput in;
+    private final IndexOutput out;
+    private final SlowChecksumDirectory dest;
+
+    public final CopyFileJob job;
+    public boolean failed;
+
+    public CopyOneFile(CopyFileJob job, SlowChecksumDirectory dest, RateLimiter rateLimiter) throws IOException {
+      this.job = job;
+      this.dest = dest;
+      in = job.src.openInput(job.fileName, job.context);
+      boolean success = false;
+      try {
+        IndexOutput out0 = dest.createOutput(job.fileName, job.context);
+        if (rateLimiter == null) {
+          // No IO rate limiting
+          out = out0;
+        } else {
+          out = new RateLimitedIndexOutput(rateLimiter, out0);
+        }
+        success = true;
+      } finally {
+        if (success == false) {
+          IOUtils.closeWhileHandlingException(in);
+        }
+      }
+
+      bytesLeft = job.metaData.length;
+    }
+
+    public boolean visit() throws IOException {
+      int chunk = bytesLeft > buffer.length ? buffer.length : (int) bytesLeft;
+      try {
+        in.readBytes(buffer, 0, chunk);
+        out.writeBytes(buffer, 0, chunk);
+      } catch (Exception e) {
+        TreeLogger.log("failed to copy " + job.fileName, e);
+        failed = true;
+      }
+      bytesLeft -= chunk;
+      return bytesLeft != 0;
+    }
+
+    /** Abort the copy. */
+    public void abort() {
+      TreeLogger.log("now abort copy file " + job.fileName);
+      try {
+        close();
+      } catch (IOException ioe) {
+      }
+      try {
+        dest.deleteFile(job.fileName);
+      } catch (IOException ioe) {
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      IOUtils.close(in, out);
+      if (failed) {
+        dest.deleteFile(job.fileName);
+        throw new IOException("copy failed");
+      }
+
+      Long actual = dest.getChecksum(job.fileName);
+      assert actual != null;
+
+      if (actual.longValue() != job.metaData.checksum) {
+        // Uh oh, bits flipped during copy:
+        dest.deleteFile(job.fileName);
+        throw new IOException("checksum mismatch");
+      }
+    }
+  }
+
+  static class CopyResult {
+    public final CountDownLatch done;
+    public final AtomicBoolean failed = new AtomicBoolean();
+
+    public CopyResult(int fileCount) {
+      done = new CountDownLatch(fileCount);
+    }
+  }
+
+  // TODO: abstract this, enable swapping in different
+  // low-level tools ... rsync, bittorrent, etc.
+
+  /** Simple class to copy low (merged segments) & high
+   *  (flushes) priority files.  It has a simplistic
+   *  "ionice" implementation: if we are copying a low-pri
+   *  (merge) file and a high-pri (flush) job shows up, then
+   *  we pause the low-pri copy and finish all high-pri
+   *  copies, then resume it. */
+
+  static class SimpleFileCopier extends Thread implements Closeable {
+
+    private final Queue<CopyFileJob> highPriorityJobs = new ConcurrentLinkedQueue<>();
+    private final Queue<CopyFileJob> lowPriorityJobs = new ConcurrentLinkedQueue<>();
+    private final SlowChecksumDirectory dest;
+
+    // nocommit make rate limit (10 MB/sec now) controllable:
+    private final RateLimiter mergeRateLimiter = new RateLimiter.SimpleRateLimiter(10.0);
+    private final int id;
+    private boolean stop;
+
+    /** We always copy files into this dest. */
+    public SimpleFileCopier(SlowChecksumDirectory dest, int id) {
+      this.dest = dest;
+      this.id = id;
+    }
+
+    // nocommit use Future here
+    public synchronized CopyResult add(SlowChecksumDirectory src, Map<String,FileMetaData> files, boolean lowPriority, IOContext context) {
+      if (stop) {
+        throw new AlreadyClosedException("closed");
+      }
+      CopyResult result = new CopyResult(files.size());
+      Queue<CopyFileJob> queue = lowPriority ? lowPriorityJobs : highPriorityJobs;
+      for(Map.Entry<String,FileMetaData> ent : files.entrySet()) {
+        queue.add(new CopyFileJob(result, context, src, ent.getKey(), ent.getValue()));
+      }
+      notify();
+      return result;
+    }
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName("fileCopier id=" + id);
+      TreeLogger.setLogger(new TreeLogger("fileCopier id=" + id));
+
+      CopyOneFile curLowPri = null;
+      CopyOneFile curHighPri = null;
+
+      boolean success = false;
+
+      try {
+        while (stop == false) {
+          if (curHighPri != null) {
+            if (curHighPri.visit() == false) {
+              TreeLogger.log("id=" + id + " " + curHighPri.job.fileName + " high-priority copy done");
+              // Finished copying; now close & verify checksums:
+              try {
+                curHighPri.close();
+              } catch (IOException ioe) {
+                curHighPri.job.result.failed.set(true);
+                TreeLogger.log("WARNING: id=" + id + " " + curHighPri.job.fileName + ": failed to copy high-priority file");
+              } finally {
+                curHighPri.job.result.done.countDown();
+              }
+              curHighPri = null;
+            }
+          } else if (highPriorityJobs.isEmpty() == false) {
+            // Start new high-priority copy:
+            CopyFileJob job = highPriorityJobs.poll();
+            try {
+              curHighPri = new CopyOneFile(job, dest, null);
+              TreeLogger.log("id=" + id + " " + curHighPri.job.fileName + " now start high-priority copy");
+            } catch (AlreadyClosedException ace) {
+              TreeLogger.log("id=" + id + " " + job.fileName + " skip copy: hit AlreadyClosedException");
+              job.result.failed.set(true);
+              job.result.done.countDown();
+            }
+          } else if (curLowPri != null) {
+            if (curLowPri.visit() == false) {
+              TreeLogger.log("id=" + id + " " + curLowPri.job.fileName + " low-priority copy done");
+              // Finished copying; now close & verify checksums:
+              try {
+                curLowPri.close();
+              } catch (IOException ioe) {
+                TreeLogger.log("WARNING: id=" + id + " " + curLowPri.job.fileName + ": failed to copy low-priority file");
+                curLowPri.job.result.failed.set(true);
+              } finally {
+                curLowPri.job.result.done.countDown();
+              }
+              curLowPri = null;
+            }
+          } else if (lowPriorityJobs.isEmpty() == false) {
+            // Start new low-priority copy:
+            CopyFileJob job = lowPriorityJobs.poll();
+            try {
+              curLowPri = new CopyOneFile(job, dest, mergeRateLimiter);
+              TreeLogger.log("id=" + id + " " + curLowPri.job.fileName + " now start low-priority copy");
+            } catch (AlreadyClosedException ace) {
+              TreeLogger.log("id=" + id + " " + job.fileName + " skip copy: hit AlreadyClosedException");
+              job.result.failed.set(true);
+              job.result.done.countDown();
+            }
+          } else {
+            // Wait for another job:
+            synchronized (this) {
+              if (highPriorityJobs.isEmpty() && lowPriorityJobs.isEmpty()) {
+                TreeLogger.log("id=" + id + " copy thread now idle");
+                try {
+                  wait();
+                } catch (InterruptedException ie) {
+                  Thread.currentThread().interrupt();
+                }
+                TreeLogger.log("id=" + id + " copy thread now wake up");
+              }
+            }
+          }
+        }
+        success = true;
+      } catch (IOException ioe) {
+        // nocommit catch each op & retry instead?
+        throw new RuntimeException(ioe);
+      } finally {
+        synchronized(this) {
+          stop = true;
+          if (curLowPri != null) {
+            curLowPri.abort();
+            curLowPri.job.result.failed.set(true);
+            curLowPri.job.result.done.countDown();
+          }
+          if (curHighPri != null) {
+            curHighPri.abort();
+            curHighPri.job.result.failed.set(true);
+            curHighPri.job.result.done.countDown();
+          }
+
+          for(CopyFileJob job : highPriorityJobs) {
+            job.result.failed.set(true);
+            job.result.done.countDown();
+          }
+
+          for(CopyFileJob job : lowPriorityJobs) {
+            job.result.failed.set(true);
+            job.result.done.countDown();
+          }
+        }
+      }
+    }
+
+    // nocommit cutover all other finishes to closeable
+
+    @Override      
+    public synchronized void close() throws IOException {
+      stop = true;
+      try {
+        notify();
+        join();
+      } catch (InterruptedException ie) {
+        // nocommit
+        throw new IOException(ie);
+      }
+    }
+  }
+
+  // TODO: we could pre-copy merged files out event before
+  // warmMerge is called?  e.g. if we "notice" files being
+  // written to the dir ... could give us a "head start"
+
+  /** Runs in each replica, to handle copying over new
+   *  flushes. */
+  static class CopyThread extends Thread {
+    final Lock lock;
+    final Condition cond;
+    private final Replica replica;
+    private volatile boolean stop;
+
+    public CopyThread(Replica replica) {
+      this.lock = new ReentrantLock();
+      this.cond = lock.newCondition();
+      this.stop = false;
+      this.replica = replica;
+    }
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName("replica id=" + replica.id);
+      TreeLogger.setLogger(new TreeLogger("replica id=" + replica.id));
+
+      try {
+        // While loop to keep pulling newly flushed/merged
+        // segments until we shutdown
+        while (stop == false) {
+
+          try {
+
+            // While loop to pull a single new segment:
+            long curVersion = replica.mgr.getCurrentInfos().version;
+
+            SegmentInfos newInfos = null;
+            CopyState copyState = null;
+            Master curMaster = null;
+            int curMasterMoveCount = -1;
+            while (stop == false) {
+              lock.lock();
+              try {
+                curMaster = master;
+                if (curMaster != null) { 
+                  copyState = curMaster.getCopyState();
+                  curMasterMoveCount = masterCount.get();
+                  if (copyState != null) {
+                    assert copyState.version >= curVersion: "copyState.version=" + copyState.version + " vs curVersion=" + curVersion;
+                    if (copyState.version > curVersion) {
+                      TreeLogger.log("got new copyState");
+                      break;
+                    } else {
+                      TreeLogger.log("skip newInfos");
+                      curMaster.releaseCopyState(copyState);
+                      copyState = null;
+                    }
+                  } else {
+                    // Master is closed
+                    Thread.sleep(5);
+                  }
+                } else {
+                  // Master hasn't started yet
+                  Thread.sleep(5);
+                }
+                cond.awaitUninterruptibly();
+              } finally {
+                lock.unlock();
+              }
+            }
+
+            // We hold a ref on newInfos at this point
+
+            // nocommit what if master closes now?
+
+            if (stop) {
+              if (copyState != null) {
+                curMaster.releaseCopyState(copyState);
+              }
+              break;
+            }
+
+            // nocommit need to sometimes crash during
+            // replication; ooh: we could just Thread.interrupt()?
+
+            // nocommit needs to cross the "wire", ie turn into
+            // byte[] and back, and manage the "reservation"
+            // separately on master
+
+            try {
+              // nocommit just pass copyState
+              replica.sync(curMasterMoveCount, copyState.dir, copyState.files, copyState.infosBytes, copyState.version);
+            } finally {
+              curMaster.releaseCopyState(copyState);
+            }
+          } catch (AlreadyClosedException ace) {
+            // OK: master closed while we were replicating;
+            // we will just retry again against the next master:
+            TreeLogger.log("top: id=" + replica.id + " ignore AlreadyClosedException");
+            Thread.sleep(5);
+            continue;
+          }
+        }
+      } catch (InterruptedException ie) {
+        throw new RuntimeException(ie);
+      } catch (IOException ioe) {
+        // nocommit how to properly handle...
+        throw new RuntimeException(ioe);
+      }
+    }
+
+    /** Shuts down the thread and only returns once
+     *  it's done. */
+    public void finish() throws InterruptedException {
+      stop = true;
+      lock.lock();
+      try {
+        cond.signal();
+      } finally {
+        lock.unlock();
+      }
+      join();
+    }
+  }
+
   static class SearchThread extends Thread {
     private volatile boolean stop;
     private final InfosSearcherManager mgr;
@@ -1292,7 +1741,7 @@ public class TestNRTReplication extends 
             if (totalHits > 0) {
               long version = ((DirectoryReader) s.getIndexReader()).getVersion();
               Integer expectedCount = versionDocCounts.get(version);
-              assertNotNull("searcher " + s + " is missing expected count", expectedCount);
+              assertNotNull("searcher " + s + " version=" + version + " is missing expected count", expectedCount);
               // nocommit since master may roll back in time
               // we cannot assert this:
               //assertEquals("searcher version=" + version + " replica id=" + id + " searcher=" + s, expectedCount.intValue(), totalHits);
@@ -1342,13 +1791,18 @@ public class TestNRTReplication extends 
         } else {
           refCounts.put(fileName, curCount.intValue() + 1);
         }
+
+        // Necessary in case we had tried to delete this
+        // fileName before, it failed, but then it was later
+        // overwritten:
+        pending.remove(fileName);
       }
     }
 
     public synchronized void decRef(Collection<String> fileNames) {
       for(String fileName : fileNames) {
         Integer curCount = refCounts.get(fileName);
-        assert curCount != null;
+        assert curCount != null: "fileName=" + fileName;
         assert curCount.intValue() > 0;
         if (curCount.intValue() == 1) {
           refCounts.remove(fileName);
@@ -1390,5 +1844,4 @@ public class TestNRTReplication extends 
       }
     }
   }
-
 }



Mime
View raw message