hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1417596 [5/6] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java...
Date Wed, 05 Dec 2012 19:22:25 GMT
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+
+public class TestEpochsAreUnique {
+  private static final Log LOG = LogFactory.getLog(TestEpochsAreUnique.class);
+  private static final String JID = "testEpochsAreUnique-jid";
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L);
+  private Random r = new Random();
+  
+  @Test
+  public void testSingleThreaded() throws IOException {
+    Configuration conf = new Configuration();
+    MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
+    URI uri = cluster.getQuorumJournalURI(JID);
+    QuorumJournalManager qjm = new QuorumJournalManager(
+        conf, uri, FAKE_NSINFO);
+    try {
+      qjm.format(FAKE_NSINFO);
+    } finally {
+      qjm.close();
+    }
+    
+    try {
+      // With no failures or contention, epochs should increase one-by-one
+      for (int i = 0; i < 5; i++) {
+        qjm = new QuorumJournalManager(
+            conf, uri, FAKE_NSINFO);
+        try {
+          qjm.createNewUniqueEpoch();
+          assertEquals(i + 1, qjm.getLoggerSetForTests().getEpoch());
+        } finally {
+          qjm.close();
+        }
+      }
+      
+      long prevEpoch = 5;
+      // With some failures injected, it should still always increase, perhaps
+      // skipping some
+      for (int i = 0; i < 20; i++) {
+        long newEpoch = -1;
+        while (true) {
+          qjm = new QuorumJournalManager(
+              conf, uri, FAKE_NSINFO, new FaultyLoggerFactory());
+          try {
+            qjm.createNewUniqueEpoch();
+            newEpoch = qjm.getLoggerSetForTests().getEpoch();
+            break;
+          } catch (IOException ioe) {
+            // It's OK to fail to create an epoch, since we randomly inject
+            // faults. It's possible we'll inject faults in too many of the
+            // underlying nodes, and a failure is expected in that case
+          } finally {
+            qjm.close();
+          }
+        }
+        LOG.info("Created epoch " + newEpoch);
+        assertTrue("New epoch " + newEpoch + " should be greater than previous " +
+            prevEpoch, newEpoch > prevEpoch);
+        prevEpoch = newEpoch;
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private class FaultyLoggerFactory implements AsyncLogger.Factory {
+    @Override
+    public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr) {
+      AsyncLogger ch = IPCLoggerChannel.FACTORY.createLogger(
+          conf, nsInfo, journalId, addr);
+      AsyncLogger spy = Mockito.spy(ch);
+      Mockito.doAnswer(new SometimesFaulty<Long>(0.10f))
+          .when(spy).getJournalState();
+      Mockito.doAnswer(new SometimesFaulty<Void>(0.40f))
+          .when(spy).newEpoch(Mockito.anyLong());
+
+      return spy;
+    }
+    
+  }
+
+  private class SometimesFaulty<T> implements Answer<ListenableFuture<T>> {
+    private float faultProbability;
+
+    public SometimesFaulty(float faultProbability) {
+      this.faultProbability = faultProbability;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ListenableFuture<T> answer(InvocationOnMock invocation)
+        throws Throwable {
+      if (r.nextFloat() < faultProbability) {
+        return Futures.immediateFailedFuture(
+            new IOException("Injected fault"));
+      }
+      return (ListenableFuture<T>)invocation.callRealMethod();
+    }
+  }
+
+
+
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
+import org.apache.hadoop.hdfs.qjournal.client.LoggerTooFarBehindException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.base.Supplier;
+
+public class TestIPCLoggerChannel {
+  private static final Log LOG = LogFactory.getLog(
+      TestIPCLoggerChannel.class);
+  
+  private Configuration conf = new Configuration();
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L);
+  private static final String JID = "test-journalid";
+  private static final InetSocketAddress FAKE_ADDR =
+      new InetSocketAddress(0);
+  private static final byte[] FAKE_DATA = new byte[4096];
+  
+  private QJournalProtocol mockProxy = Mockito.mock(QJournalProtocol.class);
+  private IPCLoggerChannel ch;
+  
+  private static final int LIMIT_QUEUE_SIZE_MB = 1;
+  private static final int LIMIT_QUEUE_SIZE_BYTES =
+      LIMIT_QUEUE_SIZE_MB * 1024 * 1024;
+  
+  @Before
+  public void setupMock() {
+    conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY,
+        LIMIT_QUEUE_SIZE_MB);
+
+    // Channel to the mock object instead of a real IPC proxy.
+    ch = new IPCLoggerChannel(conf, FAKE_NSINFO, JID, FAKE_ADDR) {
+      @Override
+      protected QJournalProtocol getProxy() throws IOException {
+        return mockProxy;
+      }
+    };
+    
+    ch.setEpoch(1);
+  }
+  
+  @Test
+  public void testSimpleCall() throws Exception {
+    ch.sendEdits(1, 1, 3, FAKE_DATA).get();
+    Mockito.verify(mockProxy).journal(Mockito.<RequestInfo>any(),
+        Mockito.eq(1L), Mockito.eq(1L),
+        Mockito.eq(3), Mockito.same(FAKE_DATA));
+  }
+
+  
+  /**
+   * Test that, once the queue eclipses the configure size limit,
+   * calls to journal more data are rejected.
+   */
+  @Test
+  public void testQueueLimiting() throws Exception {
+    
+    // Block the underlying fake proxy from actually completing any calls.
+    DelayAnswer delayer = new DelayAnswer(LOG);
+    Mockito.doAnswer(delayer).when(mockProxy).journal(
+        Mockito.<RequestInfo>any(),
+        Mockito.eq(1L), Mockito.eq(1L),
+        Mockito.eq(1), Mockito.same(FAKE_DATA));
+    
+    // Queue up the maximum number of calls.
+    int numToQueue = LIMIT_QUEUE_SIZE_BYTES / FAKE_DATA.length;
+    for (int i = 1; i <= numToQueue; i++) {
+      ch.sendEdits(1L, (long)i, 1, FAKE_DATA);
+    }
+    
+    // The accounting should show the correct total number queued.
+    assertEquals(LIMIT_QUEUE_SIZE_BYTES, ch.getQueuedEditsSize());
+    
+    // Trying to queue any more should fail.
+    try {
+      ch.sendEdits(1L, numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS);
+      fail("Did not fail to queue more calls after queue was full");
+    } catch (ExecutionException ee) {
+      if (!(ee.getCause() instanceof LoggerTooFarBehindException)) {
+        throw ee;
+      }
+    }
+    
+    delayer.proceed();
+
+    // After we allow it to proceeed, it should chug through the original queue
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return ch.getQueuedEditsSize() == 0;
+      }
+    }, 10, 1000);
+  }
+  
+  /**
+   * Test that, if the remote node gets unsynchronized (eg some edits were
+   * missed or the node rebooted), the client stops sending edits until
+   * the next roll. Test for HDFS-3726.
+   */
+  @Test
+  public void testStopSendingEditsWhenOutOfSync() throws Exception {
+    Mockito.doThrow(new IOException("injected error"))
+      .when(mockProxy).journal(
+        Mockito.<RequestInfo>any(),
+        Mockito.eq(1L), Mockito.eq(1L),
+        Mockito.eq(1), Mockito.same(FAKE_DATA));
+
+    try {
+      ch.sendEdits(1L, 1L, 1, FAKE_DATA).get();
+      fail("Injected JOOSE did not cause sendEdits() to throw");
+    } catch (ExecutionException ee) {
+      GenericTestUtils.assertExceptionContains("injected", ee);
+    }
+    Mockito.verify(mockProxy).journal(
+        Mockito.<RequestInfo>any(),
+        Mockito.eq(1L), Mockito.eq(1L),
+        Mockito.eq(1), Mockito.same(FAKE_DATA));
+
+    assertTrue(ch.isOutOfSync());
+    
+    try {
+      ch.sendEdits(1L, 2L, 1, FAKE_DATA).get();
+      fail("sendEdits() should throw until next roll");
+    } catch (ExecutionException ee) {
+      GenericTestUtils.assertExceptionContains("disabled until next roll",
+          ee.getCause());
+    }
+    
+    // It should have failed without even sending the edits, since it was not sync.
+    Mockito.verify(mockProxy, Mockito.never()).journal(
+        Mockito.<RequestInfo>any(),
+        Mockito.eq(1L), Mockito.eq(2L),
+        Mockito.eq(1), Mockito.same(FAKE_DATA));
+    // It should have sent a heartbeat instead.
+    Mockito.verify(mockProxy).heartbeat(
+        Mockito.<RequestInfo>any());
+    
+    // After a roll, sending new edits should not fail.
+    ch.startLogSegment(3L).get();
+    assertFalse(ch.isOutOfSync());
+
+    ch.sendEdits(3L, 3L, 1, FAKE_DATA).get();
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+
+
+public class TestQJMWithFaults {
+  private static final Log LOG = LogFactory.getLog(
+      TestQJMWithFaults.class);
+
+  private static final String RAND_SEED_PROPERTY =
+      "TestQJMWithFaults.random-seed";
+
+  private static final int NUM_WRITER_ITERS = 500;
+  private static final int SEGMENTS_PER_WRITER = 2;
+
+  private static Configuration conf = new Configuration();
+
+
+  static {
+    // Don't retry connections - it just slows down the tests.
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    
+    // Make tests run faster by avoiding fsync()
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+  }
+
+  // Set up fault injection mock.
+  private static JournalFaultInjector faultInjector =
+      JournalFaultInjector.instance = Mockito.mock(JournalFaultInjector.class); 
+
+  /**
+   * Run through the creation of a log without any faults injected,
+   * and count how many RPCs are made to each node. This sets the
+   * bounds for the other test cases, so they can exhaustively explore
+   * the space of potential failures.
+   */
+  private static long determineMaxIpcNumber() throws Exception {
+    Configuration conf = new Configuration();
+    MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
+    QuorumJournalManager qjm = null;
+    long ret;
+    try {
+      qjm = createInjectableQJM(cluster);
+      qjm.format(FAKE_NSINFO);
+      doWorkload(cluster, qjm);
+      
+      SortedSet<Integer> ipcCounts = Sets.newTreeSet();
+      for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
+        InvocationCountingChannel ch = (InvocationCountingChannel)l;
+        ch.waitForAllPendingCalls();
+        ipcCounts.add(ch.getRpcCount());
+      }
+  
+      // All of the loggers should have sent the same number of RPCs, since there
+      // were no failures.
+      assertEquals(1, ipcCounts.size());
+      
+      ret = ipcCounts.first();
+      LOG.info("Max IPC count = " + ret);
+    } finally {
+      IOUtils.closeStream(qjm);
+      cluster.shutdown();
+    }
+    return ret;
+  }
+  
+  /**
+   * Sets up two of the nodes to each drop a single RPC, at all
+   * possible combinations of RPCs. This may result in the
+   * active writer failing to write. After this point, a new writer
+   * should be able to recover and continue writing without
+   * data loss.
+   */
+  @Test
+  public void testRecoverAfterDoubleFailures() throws Exception {
+    final long MAX_IPC_NUMBER = determineMaxIpcNumber();
+    
+    for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) {
+      for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) {
+        String injectionStr = "(" + failA + ", " + failB + ")";
+        
+        LOG.info("\n\n-------------------------------------------\n" +
+            "Beginning test, failing at " + injectionStr + "\n" +
+            "-------------------------------------------\n\n");
+        
+        MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf)
+          .build();
+        QuorumJournalManager qjm = null;
+        try {
+          qjm = createInjectableQJM(cluster);
+          qjm.format(FAKE_NSINFO);
+          List<AsyncLogger> loggers = qjm.getLoggerSetForTests().getLoggersForTests();
+          failIpcNumber(loggers.get(0), failA);
+          failIpcNumber(loggers.get(1), failB);
+          int lastAckedTxn = doWorkload(cluster, qjm);
+
+          if (lastAckedTxn < 6) {
+            LOG.info("Failed after injecting failures at " + injectionStr + 
+                ". This is expected since we injected a failure in the " +
+                "majority.");
+          }
+          qjm.close();
+          qjm = null;
+
+          // Now should be able to recover
+          qjm = createInjectableQJM(cluster);
+          long lastRecoveredTxn = QJMTestUtil.recoverAndReturnLastTxn(qjm);
+          assertTrue(lastRecoveredTxn >= lastAckedTxn);
+          
+          writeSegment(cluster, qjm, lastRecoveredTxn + 1, 3, true);
+        } catch (Throwable t) {
+          // Test failure! Rethrow with the test setup info so it can be
+          // easily triaged.
+          throw new RuntimeException("Test failed with injection: " + injectionStr,
+                t); 
+        } finally {
+          cluster.shutdown();
+          cluster = null;
+          IOUtils.closeStream(qjm);
+          qjm = null;
+        }
+      }
+    }
+  }
+  
+  /**
+   * Test case in which three JournalNodes randomly flip flop between
+   * up and down states every time they get an RPC.
+   * 
+   * The writer keeps track of the latest ACKed edit, and on every
+   * recovery operation, ensures that it recovers at least to that
+   * point or higher. Since at any given point, a majority of JNs
+   * may be injecting faults, any writer operation is allowed to fail,
+   * so long as the exception message indicates it failed due to injected
+   * faults.
+   * 
+   * Given a random seed, the test should be entirely deterministic.
+   */
+  @Test
+  public void testRandomized() throws Exception {
+    long seed;
+    Long userSpecifiedSeed = Long.getLong(RAND_SEED_PROPERTY);
+    if (userSpecifiedSeed != null) {
+      LOG.info("Using seed specified in system property");
+      seed = userSpecifiedSeed;
+      
+      // If the user specifies a seed, then we should gather all the
+      // IPC trace information so that debugging is easier. This makes
+      // the test run about 25% slower otherwise.
+      ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL);
+    } else {
+      seed = new Random().nextLong();
+    }
+    LOG.info("Random seed: " + seed);
+    
+    Random r = new Random(seed);
+    
+    MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf)
+      .build();
+    
+    // Format the cluster using a non-faulty QJM.
+    QuorumJournalManager qjmForInitialFormat =
+        createInjectableQJM(cluster);
+    qjmForInitialFormat.format(FAKE_NSINFO);
+    qjmForInitialFormat.close();
+    
+    try {
+      long txid = 0;
+      long lastAcked = 0;
+      
+      for (int i = 0; i < NUM_WRITER_ITERS; i++) {
+        LOG.info("Starting writer " + i + "\n-------------------");
+        
+        QuorumJournalManager qjm = createRandomFaultyQJM(cluster, r);
+        try {
+          long recovered;
+          try {
+            recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
+          } catch (Throwable t) {
+            LOG.info("Failed recovery", t);
+            checkException(t);
+            continue;
+          }
+          assertTrue("Recovered only up to txnid " + recovered +
+              " but had gotten an ack for " + lastAcked,
+              recovered >= lastAcked);
+          
+          txid = recovered + 1;
+          
+          // Periodically purge old data on disk so it's easier to look
+          // at failure cases.
+          if (txid > 100 && i % 10 == 1) {
+            qjm.purgeLogsOlderThan(txid - 100);
+          }
+
+          Holder<Throwable> thrown = new Holder<Throwable>(null);
+          for (int j = 0; j < SEGMENTS_PER_WRITER; j++) {
+            lastAcked = writeSegmentUntilCrash(cluster, qjm, txid, 4, thrown);
+            if (thrown.held != null) {
+              LOG.info("Failed write", thrown.held);
+              checkException(thrown.held);
+              break;
+            }
+            txid += 4;
+          }
+        } finally {
+          qjm.close();
+        }
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private void checkException(Throwable t) {
+    GenericTestUtils.assertExceptionContains("Injected", t);
+    if (t.toString().contains("AssertionError")) {
+      throw new RuntimeException("Should never see AssertionError in fault test!",
+          t);
+    }
+  }
+
+  private long writeSegmentUntilCrash(MiniJournalCluster cluster,
+      QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) {
+    
+    long firstTxId = txid;
+    long lastAcked = txid - 1;
+    try {
+      EditLogOutputStream stm = qjm.startLogSegment(txid);
+      
+      for (int i = 0; i < numTxns; i++) {
+        QJMTestUtil.writeTxns(stm, txid++, 1);
+        lastAcked++;
+      }
+      
+      stm.close();
+      qjm.finalizeLogSegment(firstTxId, lastAcked);
+    } catch (Throwable t) {
+      thrown.held = t;
+    }
+    return lastAcked;
+  }
+
+  /**
+   * Run a simple workload of becoming the active writer and writing
+   * two log segments: 1-3 and 4-6.
+   */
+  private static int doWorkload(MiniJournalCluster cluster,
+      QuorumJournalManager qjm) throws IOException {
+    int lastAcked = 0;
+    try {
+      qjm.recoverUnfinalizedSegments();
+      writeSegment(cluster, qjm, 1, 3, true);
+      lastAcked = 3;
+      writeSegment(cluster, qjm, 4, 3, true);
+      lastAcked = 6;
+    } catch (QuorumException qe) {
+      LOG.info("Failed to write at txid " + lastAcked,
+          qe);
+    }
+    return lastAcked;
+  }
+
+  /**
+   * Inject a failure at the given IPC number, such that the JN never
+   * receives the RPC. The client side sees an IOException. Future
+   * IPCs after this number will be received as usual.
+   */
+  private void failIpcNumber(AsyncLogger logger, int idx) {
+    ((InvocationCountingChannel)logger).failIpcNumber(idx);
+  }
+  
+  private static class RandomFaultyChannel extends IPCLoggerChannel {
+    private final Random random;
+    private float injectionProbability = 0.1f;
+    private boolean isUp = true;
+    
+    public RandomFaultyChannel(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr, long seed) {
+      super(conf, nsInfo, journalId, addr);
+      this.random = new Random(seed);
+    }
+
+    @Override
+    protected QJournalProtocol createProxy() throws IOException {
+      QJournalProtocol realProxy = super.createProxy();
+      return mockProxy(
+          new WrapEveryCall<Object>(realProxy) {
+            @Override
+            void beforeCall(InvocationOnMock invocation) throws Exception {
+              if (random.nextFloat() < injectionProbability) {
+                isUp = !isUp;
+                LOG.info("transitioned " + addr + " to " +
+                    (isUp ? "up" : "down"));
+              }
+    
+              if (!isUp) {
+                throw new IOException("Injected - faking being down");
+              }
+              
+              if (invocation.getMethod().getName().equals("acceptRecovery")) {
+                if (random.nextFloat() < injectionProbability) {
+                  Mockito.doThrow(new IOException(
+                      "Injected - faking fault before persisting paxos data"))
+                      .when(faultInjector).beforePersistPaxosData();
+                } else if (random.nextFloat() < injectionProbability) {
+                  Mockito.doThrow(new IOException(
+                      "Injected - faking fault after persisting paxos data"))
+                      .when(faultInjector).afterPersistPaxosData();
+                }
+              }
+            }
+            
+            @Override
+            public void afterCall(InvocationOnMock invocation, boolean succeeded) {
+              Mockito.reset(faultInjector);
+            }
+          });
+    }
+
+    @Override
+    protected ExecutorService createExecutor() {
+      return MoreExecutors.sameThreadExecutor();
+    }
+  }
+
+  private static class InvocationCountingChannel extends IPCLoggerChannel {
+    private int rpcCount = 0;
+    private Map<Integer, Callable<Void>> injections = Maps.newHashMap();
+    
+    public InvocationCountingChannel(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr) {
+      super(conf, nsInfo, journalId, addr);
+    }
+    
+    int getRpcCount() {
+      return rpcCount;
+    }
+    
+    void failIpcNumber(final int idx) {
+      Preconditions.checkArgument(idx > 0,
+          "id must be positive");
+      inject(idx, new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          throw new IOException("injected failed IPC at " + idx);
+        }
+      });
+    }
+    
+    private void inject(int beforeRpcNumber, Callable<Void> injectedCode) {
+      injections.put(beforeRpcNumber, injectedCode);
+    }
+
+    @Override
+    protected QJournalProtocol createProxy() throws IOException {
+      final QJournalProtocol realProxy = super.createProxy();
+      QJournalProtocol mock = mockProxy(
+          new WrapEveryCall<Object>(realProxy) {
+            void beforeCall(InvocationOnMock invocation) throws Exception {
+              rpcCount++;
+              String callStr = "[" + addr + "] " + 
+                  invocation.getMethod().getName() + "(" +
+                  Joiner.on(", ").join(invocation.getArguments()) + ")";
+ 
+              Callable<Void> inject = injections.get(rpcCount);
+              if (inject != null) {
+                LOG.info("Injecting code before IPC #" + rpcCount + ": " +
+                    callStr);
+                inject.call();
+              } else {
+                LOG.info("IPC call #" + rpcCount + ": " + callStr);
+              }
+            }
+          });
+      return mock;
+    }
+  }
+
+
+  private static QJournalProtocol mockProxy(WrapEveryCall<Object> wrapper)
+      throws IOException {
+    QJournalProtocol mock = Mockito.mock(QJournalProtocol.class,
+        Mockito.withSettings()
+          .defaultAnswer(wrapper)
+          .extraInterfaces(Closeable.class));
+    return mock;
+  }
+
+  private static abstract class WrapEveryCall<T> implements Answer<T> {
+    private final Object realObj;
+    WrapEveryCall(Object realObj) {
+      this.realObj = realObj;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public T answer(InvocationOnMock invocation) throws Throwable {
+      // Don't want to inject an error on close() since that isn't
+      // actually an IPC call!
+      if (!Closeable.class.equals(
+            invocation.getMethod().getDeclaringClass())) {
+        beforeCall(invocation);
+      }
+      boolean success = false;
+      try {
+        T ret = (T) invocation.getMethod().invoke(realObj,
+          invocation.getArguments());
+        success = true;
+        return ret;
+      } catch (InvocationTargetException ite) {
+        throw ite.getCause();
+      } finally {
+        afterCall(invocation, success);
+      }
+    }
+
+    abstract void beforeCall(InvocationOnMock invocation) throws Exception;
+    void afterCall(InvocationOnMock invocation, boolean succeeded) {}
+  }
+  
+  private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster)
+      throws IOException, URISyntaxException {
+    AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
+      @Override
+      public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+          String journalId, InetSocketAddress addr) {
+        return new InvocationCountingChannel(conf, nsInfo, journalId, addr);
+      }
+    };
+    return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
+        FAKE_NSINFO, spyFactory);
+  }
+  
+  private static QuorumJournalManager createRandomFaultyQJM(
+      MiniJournalCluster cluster, final Random seedGenerator)
+          throws IOException, URISyntaxException {
+    
+    AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
+      @Override
+      public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+          String journalId, InetSocketAddress addr) {
+        return new RandomFaultyChannel(conf, nsInfo, journalId, addr,
+            seedGenerator.nextLong());
+      }
+    };
+    return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
+        FAKE_NSINFO, spyFactory);
+  }
+
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hdfs.qjournal.client.QuorumCall;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.SettableFuture;
+
+public class TestQuorumCall {
+  @Test(timeout=10000)
+  public void testQuorums() throws Exception {
+    Map<String, SettableFuture<String>> futures = ImmutableMap.of(
+        "f1", SettableFuture.<String>create(),
+        "f2", SettableFuture.<String>create(),
+        "f3", SettableFuture.<String>create());
+    
+    QuorumCall<String, String> q = QuorumCall.create(futures);
+    assertEquals(0, q.countResponses());
+    
+    futures.get("f1").set("first future");
+    q.waitFor(1, 0, 0, 100000, "test"); // wait for 1 response
+    q.waitFor(0, 1, 0, 100000, "test"); // wait for 1 success
+    assertEquals(1, q.countResponses());
+    
+    
+    futures.get("f2").setException(new Exception("error"));
+    assertEquals(2, q.countResponses());
+    
+    futures.get("f3").set("second future");
+    q.waitFor(3, 0, 100, 100000, "test"); // wait for 3 responses
+    q.waitFor(0, 2, 100, 100000, "test"); // 2 successes
+
+    assertEquals(3, q.countResponses());
+    assertEquals("f1=first future,f3=second future",
+        Joiner.on(",").withKeyValueSeparator("=").join(
+            new TreeMap<String, String>(q.getResults())));
+    
+    try {
+      q.waitFor(0, 4, 100, 10, "test");
+      fail("Didn't time out waiting for more responses than came back");
+    } catch (TimeoutException te) {
+      // expected
+    }
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,941 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.*;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
+import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Stubber;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Functional tests for QuorumJournalManager.
+ * For true unit tests, see {@link TestQuorumJournalManagerUnit}.
+ */
+public class TestQuorumJournalManager {
+  private static final Log LOG = LogFactory.getLog(
+      TestQuorumJournalManager.class);
+  
+  private MiniJournalCluster cluster;
+  private Configuration conf;
+  private QuorumJournalManager qjm;
+  private List<AsyncLogger> spies;
+  
+  static {
+    ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = new Configuration();
+    // Don't retry connections - it just slows down the tests.
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    
+    cluster = new MiniJournalCluster.Builder(conf)
+      .build();
+    
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+
+    qjm.format(QJMTestUtil.FAKE_NSINFO);
+    qjm.recoverUnfinalizedSegments();
+    assertEquals(1, qjm.getLoggerSetForTests().getEpoch());
+  }
+  
+  @After
+  public void shutdown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test
+  public void testSingleWriter() throws Exception {
+    writeSegment(cluster, qjm, 1, 3, true);
+    
+    // Should be finalized
+    checkRecovery(cluster, 1, 3);
+    
+    // Start a new segment
+    writeSegment(cluster, qjm, 4, 1, true);
+
+    // Should be finalized
+    checkRecovery(cluster, 4, 4);
+  }
+  
+  @Test
+  public void testFormat() throws Exception {
+    QuorumJournalManager qjm = new QuorumJournalManager(
+        conf, cluster.getQuorumJournalURI("testFormat-jid"), FAKE_NSINFO);
+    assertFalse(qjm.hasSomeData());
+    qjm.format(FAKE_NSINFO);
+    assertTrue(qjm.hasSomeData());
+  }
+  
+  @Test
+  public void testReaderWhileAnotherWrites() throws Exception {
+    
+    QuorumJournalManager readerQjm = createSpyingQJM();
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    readerQjm.selectInputStreams(streams, 0, false);
+    assertEquals(0, streams.size());
+    writeSegment(cluster, qjm, 1, 3, true);
+
+    readerQjm.selectInputStreams(streams, 0, false);
+    try {
+      assertEquals(1, streams.size());
+      // Validate the actual stream contents.
+      EditLogInputStream stream = streams.get(0);
+      assertEquals(1, stream.getFirstTxId());
+      assertEquals(3, stream.getLastTxId());
+      
+      verifyEdits(streams, 1, 3);
+      assertNull(stream.readOp());
+    } finally {
+      IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+      streams.clear();
+    }
+    
+    // Ensure correct results when there is a stream in-progress, but we don't
+    // ask for in-progress.
+    writeSegment(cluster, qjm, 4, 3, false);
+    readerQjm.selectInputStreams(streams, 0, false);
+    try {
+      assertEquals(1, streams.size());
+      EditLogInputStream stream = streams.get(0);
+      assertEquals(1, stream.getFirstTxId());
+      assertEquals(3, stream.getLastTxId());
+      verifyEdits(streams, 1, 3);
+    } finally {
+      IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+      streams.clear();
+    }
+    
+    // TODO: check results for selectInputStreams with inProgressOK = true.
+    // This doesn't currently work, due to a bug where RedundantEditInputStream
+    // throws an exception if there are any unvalidated in-progress edits in the list!
+    // But, it shouldn't be necessary for current use cases.
+    
+    qjm.finalizeLogSegment(4, 6);
+    readerQjm.selectInputStreams(streams, 0, false);
+    try {
+      assertEquals(2, streams.size());
+      assertEquals(4, streams.get(1).getFirstTxId());
+      assertEquals(6, streams.get(1).getLastTxId());
+
+      verifyEdits(streams, 1, 6);
+    } finally {
+      IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+      streams.clear();
+    }
+  }
+  
+  /**
+   * Regression test for HDFS-3725. One of the journal nodes is down
+   * during the writing of one segment, then comes back up later to
+   * take part in a later segment. Thus, its local edits are
+   * not a contiguous sequence. This should be handled correctly.
+   */
+  @Test
+  public void testOneJNMissingSegments() throws Exception {
+    writeSegment(cluster, qjm, 1, 3, true);
+    waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    cluster.getJournalNode(0).stopAndJoin(0);
+    writeSegment(cluster, qjm, 4, 3, true);
+    waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    cluster.restartJournalNode(0);
+    writeSegment(cluster, qjm, 7, 3, true);
+    waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    cluster.getJournalNode(1).stopAndJoin(0);
+    
+    QuorumJournalManager readerQjm = createSpyingQJM();
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    try {
+      readerQjm.selectInputStreams(streams, 1, false);
+      verifyEdits(streams, 1, 9);
+    } finally {
+      IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+      readerQjm.close();
+    }
+  }
+  
+  /**
+   * Regression test for HDFS-3891: selectInputStreams should throw
+   * an exception when a majority of journalnodes have crashed.
+   */
+  @Test
+  public void testSelectInputStreamsMajorityDown() throws Exception {
+    // Shut down all of the JNs.
+    cluster.shutdown();
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    try {
+      qjm.selectInputStreams(streams, 0, false);
+      fail("Did not throw IOE");
+    } catch (QuorumException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Got too many exceptions", ioe);
+      assertTrue(streams.isEmpty());
+    }
+  }
+  
+  /**
+   * Test the case where the NN crashes after starting a new segment
+   * on all nodes, but before writing the first transaction to it.
+   */
+  @Test
+  public void testCrashAtBeginningOfSegment() throws Exception {
+    writeSegment(cluster, qjm, 1, 3, true);
+    waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    
+    EditLogOutputStream stm = qjm.startLogSegment(4);
+    try {
+      waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    } finally {
+      stm.abort();
+    }
+    
+    
+    // Make a new QJM
+    qjm = new QuorumJournalManager(
+        conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
+    qjm.recoverUnfinalizedSegments();
+    checkRecovery(cluster, 1, 3);
+
+    writeSegment(cluster, qjm, 4, 3, true);
+  }
+  
+  @Test
+  public void testOutOfSyncAtBeginningOfSegment0() throws Exception {
+    doTestOutOfSyncAtBeginningOfSegment(0);
+  }
+  
+  @Test
+  public void testOutOfSyncAtBeginningOfSegment1() throws Exception {
+    doTestOutOfSyncAtBeginningOfSegment(1);
+  }
+
+  @Test
+  public void testOutOfSyncAtBeginningOfSegment2() throws Exception {
+    doTestOutOfSyncAtBeginningOfSegment(2);
+  }
+  
+  /**
+   * Test the case where, at the beginning of a segment, transactions
+   * have been written to one JN but not others.
+   */
+  public void doTestOutOfSyncAtBeginningOfSegment(int nodeWithOneTxn)
+      throws Exception {
+    
+    int nodeWithEmptySegment = (nodeWithOneTxn + 1) % 3;
+    int nodeMissingSegment = (nodeWithOneTxn + 2) % 3;
+    
+    writeSegment(cluster, qjm, 1, 3, true);
+    waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    cluster.getJournalNode(nodeMissingSegment).stopAndJoin(0);
+    
+    // Open segment on 2/3 nodes
+    EditLogOutputStream stm = qjm.startLogSegment(4);
+    try {
+      waitForAllPendingCalls(qjm.getLoggerSetForTests());
+      
+      // Write transactions to only 1/3 nodes
+      failLoggerAtTxn(spies.get(nodeWithEmptySegment), 4);
+      try {
+        writeTxns(stm, 4, 1);
+        fail("Did not fail even though 2/3 failed");
+      } catch (QuorumException qe) {
+        GenericTestUtils.assertExceptionContains("mock failure", qe);
+      }
+    } finally {
+      stm.abort();
+    }
+    
+    // Bring back the down JN.
+    cluster.restartJournalNode(nodeMissingSegment);
+    
+    // Make a new QJM. At this point, the state is as follows:
+    // A: nodeWithEmptySegment: 1-3 finalized, 4_inprogress (empty)    
+    // B: nodeWithOneTxn:       1-3 finalized, 4_inprogress (1 txn)
+    // C: nodeMissingSegment:   1-3 finalized
+    GenericTestUtils.assertGlobEquals(
+        cluster.getCurrentDir(nodeWithEmptySegment, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 3),
+        NNStorage.getInProgressEditsFileName(4));
+    GenericTestUtils.assertGlobEquals(
+        cluster.getCurrentDir(nodeWithOneTxn, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 3),
+        NNStorage.getInProgressEditsFileName(4));
+    GenericTestUtils.assertGlobEquals(
+        cluster.getCurrentDir(nodeMissingSegment, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 3));
+    
+
+    // Stop one of the nodes. Since we run this test three
+    // times, rotating the roles of the nodes, we'll test
+    // all the permutations.
+    cluster.getJournalNode(2).stopAndJoin(0);
+  
+    qjm = createSpyingQJM();
+    qjm.recoverUnfinalizedSegments();
+    
+    if (nodeWithOneTxn == 0 ||
+        nodeWithOneTxn == 1) {
+      // If the node that had the transaction committed was one of the nodes
+      // that responded during recovery, then we should have recovered txid
+      // 4.
+      checkRecovery(cluster, 4, 4);
+      writeSegment(cluster, qjm, 5, 3, true);
+    } else {
+      // Otherwise, we should have recovered only 1-3 and should be able to
+      // start a segment at 4.
+      checkRecovery(cluster, 1, 3);
+      writeSegment(cluster, qjm, 4, 3, true);
+    }
+  }
+
+  
+  /**
+   * Test case where a new writer picks up from an old one with no failures
+   * and the previous unfinalized segment entirely consistent -- i.e. all
+   * the JournalNodes end at the same transaction ID.
+   */
+  @Test
+  public void testChangeWritersLogsInSync() throws Exception {
+    writeSegment(cluster, qjm, 1, 3, false);
+    QJMTestUtil.assertExistsInQuorum(cluster,
+        NNStorage.getInProgressEditsFileName(1));
+
+    // Make a new QJM
+    qjm = new QuorumJournalManager(
+        conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
+    qjm.recoverUnfinalizedSegments();
+    checkRecovery(cluster, 1, 3);
+  }
+  
+  /**
+   * Test case where a new writer picks up from an old one which crashed
+   * with the three loggers at different txnids
+   */
+  @Test
+  public void testChangeWritersLogsOutOfSync1() throws Exception {
+    // Journal states:  [3, 4, 5]
+    // During recovery: [x, 4, 5]
+    // Should recovery to txn 5
+    doOutOfSyncTest(0, 5L);
+  }
+
+  @Test
+  public void testChangeWritersLogsOutOfSync2() throws Exception {
+    // Journal states:  [3, 4, 5]
+    // During recovery: [3, x, 5]
+    // Should recovery to txn 5
+    doOutOfSyncTest(1, 5L);
+  }
+
+  @Test
+  public void testChangeWritersLogsOutOfSync3() throws Exception {
+    // Journal states:  [3, 4, 5]
+    // During recovery: [3, 4, x]
+    // Should recovery to txn 4
+    doOutOfSyncTest(2, 4L);
+  }
+
+  
+  private void doOutOfSyncTest(int missingOnRecoveryIdx,
+      long expectedRecoveryTxnId) throws Exception {
+    setupLoggers345();
+    
+    QJMTestUtil.assertExistsInQuorum(cluster,
+        NNStorage.getInProgressEditsFileName(1));
+
+    // Shut down the specified JN, so it's not present during recovery.
+    cluster.getJournalNode(missingOnRecoveryIdx).stopAndJoin(0);
+
+    // Make a new QJM
+    qjm = createSpyingQJM();
+    
+    qjm.recoverUnfinalizedSegments();
+    checkRecovery(cluster, 1, expectedRecoveryTxnId);
+  }
+  
+  
+  private void failLoggerAtTxn(AsyncLogger spy, long txid) {
+    TestQuorumJournalManagerUnit.futureThrows(new IOException("mock failure"))
+      .when(spy).sendEdits(Mockito.anyLong(),
+        Mockito.eq(txid), Mockito.eq(1), Mockito.<byte[]>any());
+  }
+  
+  /**
+   * Test the case where one of the loggers misses a finalizeLogSegment()
+   * call, and then misses the next startLogSegment() call before coming
+   * back to life.
+   * 
+   * Previously, this caused it to keep on writing to the old log segment,
+   * such that one logger had eg edits_1-10 while the others had edits_1-5 and
+   * edits_6-10. This caused recovery to fail in certain cases.
+   */
+  @Test
+  public void testMissFinalizeAndNextStart() throws Exception {
+    
+    // Logger 0: miss finalize(1-3) and start(4)
+    futureThrows(new IOException("injected")).when(spies.get(0))
+      .finalizeLogSegment(Mockito.eq(1L), Mockito.eq(3L));
+    futureThrows(new IOException("injected")).when(spies.get(0))
+      .startLogSegment(Mockito.eq(4L));
+    
+    // Logger 1: fail at txn id 4
+    failLoggerAtTxn(spies.get(1), 4L);
+    
+    writeSegment(cluster, qjm, 1, 3, true);
+    EditLogOutputStream stm = qjm.startLogSegment(4);
+    try {
+      writeTxns(stm, 4, 1);
+      fail("Did not fail to write");
+    } catch (QuorumException qe) {
+      // Should fail, because logger 1 had an injected fault and
+      // logger 0 should detect writer out of sync
+      GenericTestUtils.assertExceptionContains("Writer out of sync",
+          qe);
+    } finally {
+      stm.abort();
+      qjm.close();
+    }
+    
+    // State:
+    // Logger 0: 1-3 in-progress (since it missed finalize)
+    // Logger 1: 1-3 finalized
+    // Logger 2: 1-3 finalized, 4 in-progress with one txn
+    
+    // Shut down logger 2 so it doesn't participate in recovery
+    cluster.getJournalNode(2).stopAndJoin(0);
+    
+    qjm = createSpyingQJM();
+    long recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
+    assertEquals(3L, recovered);
+  }
+  
+  /**
+   * edit lengths [3,4,5]
+   * first recovery:
+   * - sees [3,4,x]
+   * - picks length 4 for recoveryEndTxId
+   * - calls acceptRecovery()
+   * - crashes before finalizing
+   * second recovery:
+   * - sees [x, 4, 5]
+   * - should pick recovery length 4, even though it saw
+   *   a larger txid, because a previous recovery accepted it
+   */
+  @Test
+  public void testRecoverAfterIncompleteRecovery() throws Exception {
+    setupLoggers345();
+
+    // Shut down the logger that has length = 5
+    cluster.getJournalNode(2).stopAndJoin(0);
+
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+
+    // Allow no logger to finalize
+    for (AsyncLogger spy : spies) {
+      TestQuorumJournalManagerUnit.futureThrows(new IOException("injected"))
+        .when(spy).finalizeLogSegment(Mockito.eq(1L),
+            Mockito.eq(4L));
+    }
+    try {
+      qjm.recoverUnfinalizedSegments();
+      fail("Should have failed recovery since no finalization occurred");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("injected", ioe);
+    }
+    
+    // Now bring back the logger that had 5, and run recovery again.
+    // We should recover to 4, even though there's a longer log.
+    cluster.getJournalNode(0).stopAndJoin(0);
+    cluster.restartJournalNode(2);
+    
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+    qjm.recoverUnfinalizedSegments();
+    checkRecovery(cluster, 1, 4);
+  }
+  
+  /**
+   * Set up the loggers into the following state:
+   * - JN0: edits 1-3 in progress
+   * - JN1: edits 1-4 in progress
+   * - JN2: edits 1-5 in progress
+   * 
+   * None of the loggers have any associated paxos info.
+   */
+  private void setupLoggers345() throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(1);
+    
+    failLoggerAtTxn(spies.get(0), 4);
+    failLoggerAtTxn(spies.get(1), 5);
+    
+    writeTxns(stm, 1, 3);
+    
+    // This should succeed to 2/3 loggers
+    writeTxns(stm, 4, 1);
+    
+    // This should only succeed to 1 logger (index 2). Hence it should
+    // fail
+    try {
+      writeTxns(stm, 5, 1);
+      fail("Did not fail to write when only a minority succeeded");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains(
+          "too many exceptions to achieve quorum size 2/3",
+          qe);
+    }
+  }
+
+  /**
+   * Set up the following tricky edge case state which is used by
+   * multiple tests:
+   * 
+   * Initial writer:
+   * - Writing to 3 JNs: JN0, JN1, JN2:
+   * - A log segment with txnid 1 through 100 succeeds.
+   * - The first transaction in the next segment only goes to JN0
+   *   before the writer crashes (eg it is partitioned)
+   *   
+   * Recovery by another writer:
+   * - The new NN starts recovery and talks to all three. Thus, it sees
+   *   that the newest log segment which needs recovery is 101.
+   * - It sends the prepareRecovery(101) call, and decides that the
+   *   recovery length for 101 is only the 1 transaction.
+   * - It sends acceptRecovery(101-101) to only JN0, before crashing
+   * 
+   * This yields the following state:
+   * - JN0: 1-100 finalized, 101_inprogress, accepted recovery: 101-101
+   * - JN1: 1-100 finalized, 101_inprogress.empty
+   * - JN2: 1-100 finalized, 101_inprogress.empty
+   *  (the .empty files got moved aside during recovery)
+   * @throws Exception 
+   */
+  private void setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery() throws Exception {
+    // Log segment with txns 1-100 succeeds 
+    writeSegment(cluster, qjm, 1, 100, true);
+
+    // startLogSegment only makes it to one of the three nodes
+    failLoggerAtTxn(spies.get(1), 101);
+    failLoggerAtTxn(spies.get(2), 101);
+    
+    try {
+      writeSegment(cluster, qjm, 101, 1, true);
+      fail("Should have failed");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains("mock failure", qe);
+    } finally {
+      qjm.close();
+    }
+    
+    // Recovery 1:
+    // make acceptRecovery() only make it to the node which has txid 101
+    // this should fail because only 1/3 accepted the recovery
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+    futureThrows(new IOException("mock failure")).when(spies.get(1))
+      .acceptRecovery(Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
+    futureThrows(new IOException("mock failure")).when(spies.get(2))
+      .acceptRecovery(Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
+    
+    try {
+      qjm.recoverUnfinalizedSegments();
+      fail("Should have failed to recover");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains("mock failure", qe);
+    } finally {
+      qjm.close();
+    }
+    
+    // Check that we have entered the expected state as described in the
+    // method javadoc.
+    GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(0, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 100),
+        NNStorage.getInProgressEditsFileName(101));
+    GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(1, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 100),
+        NNStorage.getInProgressEditsFileName(101) + ".empty");
+    GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(2, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 100),
+        NNStorage.getInProgressEditsFileName(101) + ".empty");
+
+    File paxos0 = new File(cluster.getCurrentDir(0, JID), "paxos");
+    File paxos1 = new File(cluster.getCurrentDir(1, JID), "paxos");
+    File paxos2 = new File(cluster.getCurrentDir(2, JID), "paxos");
+    
+    GenericTestUtils.assertGlobEquals(paxos0, ".*", "101");
+    GenericTestUtils.assertGlobEquals(paxos1, ".*");
+    GenericTestUtils.assertGlobEquals(paxos2, ".*");
+  }
+  
+  /**
+   * Test an edge case discovered by randomized testing.
+   * 
+   * Starts with the edge case state set up by
+   * {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()}
+   * 
+   * Recovery 2:
+   * - New NN starts recovery and only talks to JN1 and JN2. JN0 has
+   *   crashed. Since they have no logs open, they say they don't need
+   *   recovery.
+   * - Starts writing segment 101, and writes 50 transactions before crashing.
+   *
+   * Recovery 3:
+   * - JN0 has come back to life.
+   * - New NN starts recovery and talks to all three. All three have
+   *   segments open from txid 101, so it calls prepareRecovery(101)
+   * - JN0 has an already-accepted value for segment 101, so it replies
+   *   "you should recover 101-101"
+   * - Former incorrect behavior: NN truncates logs to txid 101 even though
+   *   it should have recovered through 150.
+   *   
+   * In this case, even though there is an accepted recovery decision,
+   * the newer log segments should take precedence, since they were written
+   * in a newer epoch than the recorded decision.
+   */
+  @Test
+  public void testNewerVersionOfSegmentWins() throws Exception {
+    setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery();
+    
+    // Now start writing again without JN0 present:
+    cluster.getJournalNode(0).stopAndJoin(0);
+    
+    qjm = createSpyingQJM();
+    try {
+      assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm));
+      
+      // Write segment but do not finalize
+      writeSegment(cluster, qjm, 101, 50, false);
+    } finally {
+      qjm.close();
+    }
+    
+    // Now try to recover a new writer, with JN0 present,
+    // and ensure that all of the above-written transactions are recovered.
+    cluster.restartJournalNode(0);
+    qjm = createSpyingQJM();
+    try {
+      assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm));
+    } finally {
+      qjm.close();
+    }
+  }
+  
+  /**
+   * Test another edge case discovered by randomized testing.
+   * 
+   * Starts with the edge case state set up by
+   * {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()}
+   * 
+   * Recovery 2:
+   * - New NN starts recovery and only talks to JN1 and JN2. JN0 has
+   *   crashed. Since they have no logs open, they say they don't need
+   *   recovery.
+   * - Before writing any transactions, JN0 comes back to life and
+   *   JN1 crashes.
+   * - Starts writing segment 101, and writes 50 transactions before crashing.
+   *
+   * Recovery 3:
+   * - JN1 has come back to life. JN2 crashes.
+   * - New NN starts recovery and talks to all three. All three have
+   *   segments open from txid 101, so it calls prepareRecovery(101)
+   * - JN0 has an already-accepted value for segment 101, so it replies
+   *   "you should recover 101-101"
+   * - Former incorrect behavior: NN truncates logs to txid 101 even though
+   *   it should have recovered through 150.
+   *   
+   * In this case, even though there is an accepted recovery decision,
+   * the newer log segments should take precedence, since they were written
+   * in a newer epoch than the recorded decision.
+   */
+  @Test
+  public void testNewerVersionOfSegmentWins2() throws Exception {
+    setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery();
+
+    // Recover without JN0 present.
+    cluster.getJournalNode(0).stopAndJoin(0);
+    
+    qjm = createSpyingQJM();
+    try {
+      assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm));
+
+      // After recovery, JN0 comes back to life and JN1 crashes.
+      cluster.restartJournalNode(0);
+      cluster.getJournalNode(1).stopAndJoin(0);
+      
+      // Write segment but do not finalize
+      writeSegment(cluster, qjm, 101, 50, false);
+    } finally {
+      qjm.close();
+    }
+    
+    // State:
+    // JN0: 1-100 finalized, 101_inprogress (txns up to 150)
+    // Previously, JN0 had an accepted recovery 101-101 from an earlier recovery
+    // attempt.
+    // JN1: 1-100 finalized
+    // JN2: 1-100 finalized, 101_inprogress (txns up to 150)
+    
+    // We need to test that the accepted recovery 101-101 on JN0 doesn't
+    // end up truncating the log back to 101.
+
+    cluster.restartJournalNode(1);
+    cluster.getJournalNode(2).stopAndJoin(0);
+
+    qjm = createSpyingQJM();
+    try {
+      assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm));
+    } finally {
+      qjm.close();
+    }
+  }
+  
+  @Test(timeout=20000)
+  public void testCrashBetweenSyncLogAndPersistPaxosData() throws Exception {
+    JournalFaultInjector faultInjector =
+        JournalFaultInjector.instance = Mockito.mock(JournalFaultInjector.class);
+
+    setupLoggers345();
+
+    // Run recovery where the client only talks to JN0, JN1, such that it
+    // decides that the correct length is through txid 4.
+    // Only allow it to call acceptRecovery() on JN0.
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();    
+    cluster.getJournalNode(2).stopAndJoin(0);
+    injectIOE().when(spies.get(1)).acceptRecovery(
+        Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
+    
+    tryRecoveryExpectingFailure();
+
+    cluster.restartJournalNode(2);
+    
+    // State at this point:
+    // JN0: edit log for 1-4, paxos recovery data for txid 4
+    // JN1: edit log for 1-4,
+    // JN2: edit log for 1-5
+    
+    // Run recovery again, but don't allow JN0 to respond to the
+    // prepareRecovery() call. This will cause recovery to decide
+    // on txid 5.
+    // Additionally, crash all of the nodes before they persist
+    // any new paxos data.
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();    
+    injectIOE().when(spies.get(0)).prepareRecovery(Mockito.eq(1L));
+
+    Mockito.doThrow(new IOException("Injected")).when(faultInjector)
+      .beforePersistPaxosData();
+    tryRecoveryExpectingFailure();
+    Mockito.reset(faultInjector);
+    
+    // State at this point:
+    // JN0: edit log for 1-5, paxos recovery data for txid 4
+    // !!!   This is the interesting bit, above. The on-disk data and the
+    //       paxos data don't match up!
+    // JN1: edit log for 1-5,
+    // JN2: edit log for 1-5,
+
+    // Now, stop JN2, and see if we can still start up even though
+    // JN0 is in a strange state where its log data is actually newer
+    // than its accepted Paxos state.
+
+    cluster.getJournalNode(2).stopAndJoin(0);
+    
+    qjm = createSpyingQJM();
+    try {
+      long recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
+      assertTrue(recovered >= 4); // 4 was committed to a quorum
+    } finally {
+      qjm.close();
+    }
+  }
+  
+  private void tryRecoveryExpectingFailure() throws IOException {
+    try {
+      QJMTestUtil.recoverAndReturnLastTxn(qjm);
+      fail("Expected to fail recovery");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains("Injected", qe);
+    } finally {
+      qjm.close();
+    }
+
+  }
+  
+  private Stubber injectIOE() {
+    return futureThrows(new IOException("Injected"));
+  }
+
+  @Test
+  public void testPurgeLogs() throws Exception {
+    for (int txid = 1; txid <= 5; txid++) {
+      writeSegment(cluster, qjm, txid, 1, true);
+    }
+    File curDir = cluster.getCurrentDir(0, JID);
+    GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 1),
+        NNStorage.getFinalizedEditsFileName(2, 2),
+        NNStorage.getFinalizedEditsFileName(3, 3),
+        NNStorage.getFinalizedEditsFileName(4, 4),
+        NNStorage.getFinalizedEditsFileName(5, 5));
+    File paxosDir = new File(curDir, "paxos");
+    GenericTestUtils.assertExists(paxosDir);
+
+    // Create new files in the paxos directory, which should get purged too.
+    assertTrue(new File(paxosDir, "1").createNewFile());
+    assertTrue(new File(paxosDir, "3").createNewFile());
+    
+    GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
+        "1", "3");
+    
+    // Create some temporary files of the sort that are used during recovery.
+    assertTrue(new File(curDir,
+        "edits_inprogress_0000000000000000001.epoch=140").createNewFile());
+    assertTrue(new File(curDir,
+        "edits_inprogress_0000000000000000002.empty").createNewFile());
+    
+    qjm.purgeLogsOlderThan(3);
+    
+    // Log purging is asynchronous, so we have to wait for the calls
+    // to be sent and respond before verifying.
+    waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    
+    // Older edits should be purged
+    GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
+        NNStorage.getFinalizedEditsFileName(3, 3),
+        NNStorage.getFinalizedEditsFileName(4, 4),
+        NNStorage.getFinalizedEditsFileName(5, 5));
+   
+    // Older paxos files should be purged
+    GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
+        "3");
+  }
+  
+  @Test
+  public void testToString() throws Exception {
+    GenericTestUtils.assertMatches(
+        qjm.toString(),
+        "QJM to \\[127.0.0.1:\\d+, 127.0.0.1:\\d+, 127.0.0.1:\\d+\\]");
+  }
+  
+  
+  private QuorumJournalManager createSpyingQJM()
+      throws IOException, URISyntaxException {
+    AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
+      @Override
+      public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+          String journalId, InetSocketAddress addr) {
+        AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
+          protected ExecutorService createExecutor() {
+            // Don't parallelize calls to the quorum in the tests.
+            // This makes the tests more deterministic.
+            return MoreExecutors.sameThreadExecutor();
+          }
+        };
+        
+        return Mockito.spy(logger);
+      }
+    };
+    return new QuorumJournalManager(
+        conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory);
+  }
+
+  private static void waitForAllPendingCalls(AsyncLoggerSet als)
+      throws InterruptedException {
+    for (AsyncLogger l : als.getLoggersForTests()) {
+      IPCLoggerChannel ch = (IPCLoggerChannel)l;
+      ch.waitForAllPendingCalls();
+    }
+  }
+
+  private void checkRecovery(MiniJournalCluster cluster,
+      long segmentTxId, long expectedEndTxId)
+      throws IOException {
+    int numFinalized = 0;
+    for (int i = 0; i < cluster.getNumNodes(); i++) {
+      File logDir = cluster.getCurrentDir(i, JID);
+      EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId);
+      if (elf == null) {
+        continue;
+      }
+      if (!elf.isInProgress()) {
+        numFinalized++;
+        if (elf.getLastTxId() != expectedEndTxId) {
+          fail("File " + elf + " finalized to wrong txid, expected " +
+              expectedEndTxId);
+        }
+      }      
+    }
+    
+    if (numFinalized < cluster.getQuorumSize()) {
+      fail("Did not find a quorum of finalized logs starting at " +
+          segmentTxId);
+    }
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Stubber;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
+
+/**
+ * True unit tests for QuorumJournalManager
+ */
+public class TestQuorumJournalManagerUnit {
+  static {
+    ((Log4JLogger)QuorumJournalManager.LOG).getLogger().setLevel(Level.ALL);
+  }
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L);
+
+  private Configuration conf = new Configuration();
+  private List<AsyncLogger> spyLoggers;
+  private QuorumJournalManager qjm;
+  
+  @Before
+  public void setup() throws Exception {
+    spyLoggers = ImmutableList.of(
+        mockLogger(),
+        mockLogger(),
+        mockLogger());
+
+    qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) {
+      @Override
+      protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) {
+        return spyLoggers;
+      }
+    };
+
+    for (AsyncLogger logger : spyLoggers) {
+      futureReturns(GetJournalStateResponseProto.newBuilder()
+          .setLastPromisedEpoch(0)
+          .setHttpPort(-1)
+          .build())
+        .when(logger).getJournalState();
+      
+      futureReturns(
+          NewEpochResponseProto.newBuilder().build()
+          ).when(logger).newEpoch(Mockito.anyLong());
+      
+      futureReturns(null).when(logger).format(Mockito.<NamespaceInfo>any());
+    }
+    
+    qjm.recoverUnfinalizedSegments();
+  }
+  
+  private AsyncLogger mockLogger() {
+    return Mockito.mock(AsyncLogger.class);
+  }
+  
+  static <V> Stubber futureReturns(V value) {
+    ListenableFuture<V> ret = Futures.immediateFuture(value);
+    return Mockito.doReturn(ret);
+  }
+  
+  static Stubber futureThrows(Throwable t) {
+    ListenableFuture<?> ret = Futures.immediateFailedFuture(t);
+    return Mockito.doReturn(ret);
+  }
+
+
+  @Test
+  public void testAllLoggersStartOk() throws Exception {
+    futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
+    qjm.startLogSegment(1);
+  }
+
+  @Test
+  public void testQuorumOfLoggersStartOk() throws Exception {
+    futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
+    futureThrows(new IOException("logger failed"))
+      .when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
+    qjm.startLogSegment(1);
+  }
+  
+  @Test
+  public void testQuorumOfLoggersFail() throws Exception {
+    futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
+    futureThrows(new IOException("logger failed"))
+    .when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
+    futureThrows(new IOException("logger failed"))
+      .when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
+    try {
+      qjm.startLogSegment(1);
+      fail("Did not throw when quorum failed");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains("logger failed", qe);
+    }
+  }
+  
+  @Test
+  public void testWriteEdits() throws Exception {
+    EditLogOutputStream stm = createLogSegment();
+    writeOp(stm, 1);
+    writeOp(stm, 2);
+    
+    stm.setReadyToFlush();
+    writeOp(stm, 3);
+    
+    // The flush should log txn 1-2
+    futureReturns(null).when(spyLoggers.get(0)).sendEdits(
+        anyLong(), eq(1L), eq(2), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(1)).sendEdits(
+        anyLong(), eq(1L), eq(2), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(2)).sendEdits(
+        anyLong(), eq(1L), eq(2), Mockito.<byte[]>any());
+    stm.flush();
+
+    // Another flush should now log txn #3
+    stm.setReadyToFlush();
+    futureReturns(null).when(spyLoggers.get(0)).sendEdits(
+        anyLong(), eq(3L), eq(1), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(1)).sendEdits(
+        anyLong(), eq(3L), eq(1), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(2)).sendEdits(
+        anyLong(), eq(3L), eq(1), Mockito.<byte[]>any());
+    stm.flush();
+  }
+  
+  @Test
+  public void testWriteEditsOneSlow() throws Exception {
+    EditLogOutputStream stm = createLogSegment();
+    writeOp(stm, 1);
+    stm.setReadyToFlush();
+    
+    // Make the first two logs respond immediately
+    futureReturns(null).when(spyLoggers.get(0)).sendEdits(
+        anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(1)).sendEdits(
+        anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
+    
+    // And the third log not respond
+    SettableFuture<Void> slowLog = SettableFuture.<Void>create();
+    Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits(
+        anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
+    stm.flush();
+    
+    Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L);
+  }
+
+  private EditLogOutputStream createLogSegment() throws IOException {
+    futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
+    EditLogOutputStream stm = qjm.startLogSegment(1);
+    return stm;
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Maps;
+
+import static org.apache.hadoop.hdfs.qjournal.client.SegmentRecoveryComparator.INSTANCE;
+
+public class TestSegmentRecoveryComparator {
+  
+  private static Map.Entry<AsyncLogger, PrepareRecoveryResponseProto> makeEntry(
+      PrepareRecoveryResponseProto proto) {
+    return Maps.immutableEntry(Mockito.mock(AsyncLogger.class), proto);
+  }
+  
+  @Test
+  public void testComparisons() {
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> INPROGRESS_1_3 =
+        makeEntry(PrepareRecoveryResponseProto.newBuilder()
+          .setSegmentState(SegmentStateProto.newBuilder()
+              .setStartTxId(1L)
+              .setEndTxId(3L)
+              .setIsInProgress(true))
+          .setLastWriterEpoch(0L)
+          .build());
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> INPROGRESS_1_4 =
+        makeEntry(PrepareRecoveryResponseProto.newBuilder()
+          .setSegmentState(SegmentStateProto.newBuilder()
+              .setStartTxId(1L)
+              .setEndTxId(4L)
+              .setIsInProgress(true))
+          .setLastWriterEpoch(0L)
+          .build());
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> INPROGRESS_1_4_ACCEPTED =
+        makeEntry(PrepareRecoveryResponseProto.newBuilder()
+          .setSegmentState(SegmentStateProto.newBuilder()
+              .setStartTxId(1L)
+              .setEndTxId(4L)
+              .setIsInProgress(true))
+          .setLastWriterEpoch(0L)
+          .setAcceptedInEpoch(1L)
+          .build());
+
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> FINALIZED_1_3 =
+        makeEntry(PrepareRecoveryResponseProto.newBuilder()
+          .setSegmentState(SegmentStateProto.newBuilder()
+              .setStartTxId(1L)
+              .setEndTxId(3L)
+              .setIsInProgress(false))
+          .setLastWriterEpoch(0L)
+          .build());
+
+    // Should compare equal to itself
+    assertEquals(0, INSTANCE.compare(INPROGRESS_1_3, INPROGRESS_1_3));
+    
+    // Longer log wins.
+    assertEquals(-1, INSTANCE.compare(INPROGRESS_1_3, INPROGRESS_1_4));
+    assertEquals(1, INSTANCE.compare(INPROGRESS_1_4, INPROGRESS_1_3));
+    
+    // Finalized log wins even over a longer in-progress
+    assertEquals(-1, INSTANCE.compare(INPROGRESS_1_4, FINALIZED_1_3));
+    assertEquals(1, INSTANCE.compare(FINALIZED_1_3, INPROGRESS_1_4));
+
+    // Finalized log wins even if the in-progress one has an accepted
+    // recovery proposal.
+    assertEquals(-1, INSTANCE.compare(INPROGRESS_1_4_ACCEPTED, FINALIZED_1_3));
+    assertEquals(1, INSTANCE.compare(FINALIZED_1_3, INPROGRESS_1_4_ACCEPTED));
+  }
+}



Mime
View raw message