hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From umamah...@apache.org
Subject svn commit: r1342534 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src: contrib/bkjournal/ contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/ contrib/bkj...
Date Fri, 25 May 2012 08:57:00 GMT
Author: umamahesh
Date: Fri May 25 08:56:59 2012
New Revision: 1342534

URL: http://svn.apache.org/viewvc?rev=1342534&view=rev
Log:
HDFS-2717. BookKeeper Journal output stream doesn't check addComplete rc. Contributed by Ivan Kelly.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml?rev=1342534&r1=1342533&r2=1342534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml Fri May 25 08:56:59 2012
@@ -55,6 +55,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-server</artifactId>
       <scope>compile</scope>
@@ -64,6 +70,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java?rev=1342534&r1=1342533&r2=1342534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java Fri May 25 08:56:59 2012
@@ -41,6 +41,7 @@ class BookKeeperEditLogInputStream exten
   private final long firstTxId;
   private final long lastTxId;
   private final int logVersion;
+  private final boolean inProgress;
   private final LedgerHandle lh;
 
   private final FSEditLogOp.Reader reader;
@@ -69,6 +70,7 @@ class BookKeeperEditLogInputStream exten
     this.firstTxId = metadata.getFirstTxId();
     this.lastTxId = metadata.getLastTxId();
     this.logVersion = metadata.getVersion();
+    this.inProgress = metadata.isInProgress();
 
     BufferedInputStream bin = new BufferedInputStream(
         new LedgerInputStream(lh, firstBookKeeperEntry));
@@ -123,10 +125,28 @@ class BookKeeperEditLogInputStream exten
         lh.toString(), firstTxId, lastTxId);
   }
 
-  // TODO(HA): Test this.
   @Override
   public boolean isInProgress() {
-    return true;
+    return inProgress;
+  }
+
+  /**
+   * Skip forward to specified transaction id.
+   * Currently we do this by just iterating forward.
+   * If this proves to be too expensive, this can be reimplemented
+   * with a binary search over bk entries
+   */
+  public void skipTo(long txId) throws IOException {
+    long numToSkip = getFirstTxId() - txId;
+
+    FSEditLogOp op = null;
+    for (long i = 0; i < numToSkip; i++) {
+      op = readOp();
+    }
+    if (op != null && op.getTransactionId() != txId-1) {
+      throw new IOException("Corrupt stream, expected txid "
+          + (txId-1) + ", got " + op.getTransactionId());
+    }
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1342534&r1=1342533&r2=1342534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java Fri May 25 08:56:59 2012
@@ -233,11 +233,14 @@ public class BookKeeperJournalManager im
        */
       l.write(zkc, znodePath);
 
+      maxTxId.store(txId);
       return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
     } catch (Exception e) {
       if (currentLedger != null) {
         try {
+          long id = currentLedger.getId();
           currentLedger.close();
+          bkc.deleteLedger(id);
         } catch (Exception e2) {
           //log & ignore, an IOException will be thrown soon
           LOG.error("Error closing ledger", e2);
@@ -313,18 +316,34 @@ public class BookKeeperJournalManager im
     }
   }
 
-  // TODO(HA): Handle inProgressOk
-  EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
+  EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
       throws IOException {
     for (EditLogLedgerMetadata l : getLedgerList()) {
-      if (l.getFirstTxId() == fromTxnId) {
+      long lastTxId = l.getLastTxId();
+      if (l.isInProgress()) {
+        if (!inProgressOk) {
+          continue;
+        }
+
+        lastTxId = recoverLastTxId(l, false);
+      }
+
+      if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
         try {
-          LedgerHandle h = bkc.openLedger(l.getLedgerId(),
-                                          BookKeeper.DigestType.MAC,
-                                          digestpw.getBytes());
-          return new BookKeeperEditLogInputStream(h, l);
+          LedgerHandle h;
+          if (l.isInProgress()) { // we don't want to fence the current journal
+            h = bkc.openLedgerNoRecovery(l.getLedgerId(),
+                BookKeeper.DigestType.MAC, digestpw.getBytes());
+          } else {
+            h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
+                digestpw.getBytes());
+          }
+          BookKeeperEditLogInputStream s = new BookKeeperEditLogInputStream(h,
+              l);
+          s.skipTo(fromTxId);
+          return s;
         } catch (Exception e) {
-          throw new IOException("Could not open ledger for " + fromTxnId, e);
+          throw new IOException("Could not open ledger for " + fromTxId, e);
         }
       }
     }
@@ -354,26 +373,31 @@ public class BookKeeperJournalManager im
     }
   }
 
-  // TODO(HA): Handle inProgressOk
-  long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
+  long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
       throws IOException {
     long count = 0;
     long expectedStart = 0;
     for (EditLogLedgerMetadata l : getLedgerList()) {
+      long lastTxId = l.getLastTxId();
       if (l.isInProgress()) {
-        long endTxId = recoverLastTxId(l);
-        if (endTxId == HdfsConstants.INVALID_TXID) {
+        if (!inProgressOk) {
+          continue;
+        }
+
+        lastTxId = recoverLastTxId(l, false);
+        if (lastTxId == HdfsConstants.INVALID_TXID) {
           break;
         }
-        count += (endTxId - l.getFirstTxId()) + 1;
-        break;
       }
 
-      if (l.getFirstTxId() < fromTxnId) {
+      assert lastTxId >= l.getFirstTxId();
+
+      if (lastTxId < fromTxId) {
         continue;
-      } else if (l.getFirstTxId() == fromTxnId) {
-        count = (l.getLastTxId() - l.getFirstTxId()) + 1;
-        expectedStart = l.getLastTxId() + 1;
+      } else if (l.getFirstTxId() <= fromTxId && lastTxId >= fromTxId) {
+        // we can start in the middle of a segment
+        count = (lastTxId - l.getFirstTxId()) + 1;
+        expectedStart = lastTxId + 1;
       } else {
         if (expectedStart != l.getFirstTxId()) {
           if (count == 0) {
@@ -384,8 +408,8 @@ public class BookKeeperJournalManager im
             break;
           }
         }
-        count += (l.getLastTxId() - l.getFirstTxId()) + 1;
-        expectedStart = l.getLastTxId() + 1;
+        count += (lastTxId - l.getFirstTxId()) + 1;
+        expectedStart = lastTxId + 1;
       }
     }
     return count;
@@ -404,7 +428,7 @@ public class BookKeeperJournalManager im
           String znode = ledgerPath + "/" + child;
           EditLogLedgerMetadata l
             = EditLogLedgerMetadata.read(zkc, znode);
-          long endTxId = recoverLastTxId(l);
+          long endTxId = recoverLastTxId(l, true);
           if (endTxId == HdfsConstants.INVALID_TXID) {
             LOG.error("Unrecoverable corruption has occurred in segment "
                       + l.toString() + " at path " + znode
@@ -474,11 +498,19 @@ public class BookKeeperJournalManager im
    * Find the id of the last edit log transaction writen to a edit log
    * ledger.
    */
-  private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException {
+  private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
+      throws IOException {
     try {
-      LedgerHandle lh = bkc.openLedger(l.getLedgerId(),
-                                       BookKeeper.DigestType.MAC,
-                                       digestpw.getBytes());
+      LedgerHandle lh = null;
+      if (fence) {
+        lh = bkc.openLedger(l.getLedgerId(),
+                            BookKeeper.DigestType.MAC,
+                            digestpw.getBytes());
+      } else {
+        lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
+                                      BookKeeper.DigestType.MAC,
+                                      digestpw.getBytes());
+      }
       long lastAddConfirmed = lh.getLastAddConfirmed();
       BookKeeperEditLogInputStream in
         = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java?rev=1342534&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/BKJMUtil.java Fri May 25 08:56:59 2012
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.LocalBookKeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+
+import java.io.IOException;
+import java.io.File;
+
+/**
+ * Utility class for setting up bookkeeper ensembles
+ * and bringing individual bookies up and down
+ */
+class BKJMUtil {
+  protected static final Log LOG = LogFactory.getLog(BKJMUtil.class);
+
+  int nextPort = 6000; // next port for additionally created bookies
+  private Thread bkthread = null;
+  private final static String zkEnsemble = "127.0.0.1:2181";
+  int numBookies;
+
+  BKJMUtil(final int numBookies) throws Exception {
+    this.numBookies = numBookies;
+
+    bkthread = new Thread() {
+        public void run() {
+          try {
+            String[] args = new String[1];
+            args[0] = String.valueOf(numBookies);
+            LOG.info("Starting bk");
+            LocalBookKeeper.main(args);
+          } catch (InterruptedException e) {
+            // go away quietly
+          } catch (Exception e) {
+            LOG.error("Error starting local bk", e);
+          }
+        }
+      };
+  }
+
+  void start() throws Exception {
+    bkthread.start();
+    if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
+      throw new Exception("Error starting zookeeper/bookkeeper");
+    }
+    assertEquals("Not all bookies started",
+                 numBookies, checkBookiesUp(numBookies, 10));
+  }
+
+  void teardown() throws Exception {
+    if (bkthread != null) {
+      bkthread.interrupt();
+      bkthread.join();
+    }
+  }
+
+  static ZooKeeper connectZooKeeper()
+      throws IOException, KeeperException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
+        public void process(WatchedEvent event) {
+          if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
+            latch.countDown();
+          }
+        }
+      });
+    if (!latch.await(3, TimeUnit.SECONDS)) {
+      throw new IOException("Zookeeper took too long to connect");
+    }
+    return zkc;
+  }
+
+  static URI createJournalURI(String path) throws Exception {
+    return URI.create("bookkeeper://" + zkEnsemble + path);
+  }
+
+  static void addJournalManagerDefinition(Configuration conf) {
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".bookkeeper",
+             "org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager");
+  }
+
+  BookieServer newBookie() throws Exception {
+    int port = nextPort++;
+    ServerConfiguration bookieConf = new ServerConfiguration();
+    bookieConf.setBookiePort(port);
+    File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
+                                      "test");
+    tmpdir.delete();
+    tmpdir.mkdir();
+
+    bookieConf.setZkServers(zkEnsemble);
+    bookieConf.setJournalDirName(tmpdir.getPath());
+    bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
+
+    BookieServer b = new BookieServer(bookieConf);
+    b.start();
+    for (int i = 0; i < 10 && !b.isRunning(); i++) {
+      Thread.sleep(10000);
+    }
+    if (!b.isRunning()) {
+      throw new IOException("Bookie would not start");
+    }
+    return b;
+  }
+
+  /**
+   * Check that a number of bookies are available
+   * @param count number of bookies required
+   * @param timeout number of seconds to wait for bookies to start
+   * @throws IOException if bookies are not started by the time the timeout hits
+   */
+  int checkBookiesUp(int count, int timeout) throws Exception {
+    ZooKeeper zkc = connectZooKeeper();
+    try {
+      boolean up = false;
+      int mostRecentSize = 0;
+      for (int i = 0; i < timeout; i++) {
+        try {
+          List<String> children = zkc.getChildren("/ledgers/available",
+                                                  false);
+          mostRecentSize = children.size();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found " + mostRecentSize + " bookies up, "
+                      + "waiting for " + count);
+            if (LOG.isTraceEnabled()) {
+              for (String child : children) {
+                LOG.trace(" server: " + child);
+              }
+            }
+          }
+          if (mostRecentSize == count) {
+            up = true;
+            break;
+          }
+        } catch (KeeperException e) {
+          // ignore
+        }
+        Thread.sleep(1000);
+      }
+      return mostRecentSize;
+    } finally {
+      zkc.close();
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java?rev=1342534&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java Fri May 25 08:56:59 2012
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
+
+import org.apache.hadoop.ipc.RemoteException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.bookkeeper.proto.BookieServer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Integration test to ensure that the BookKeeper JournalManager
+ * works for HDFS Namenode HA
+ */
+public class TestBookKeeperAsHASharedDir {
+  static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class);
+
+  private static BKJMUtil bkutil;
+  static int numBookies = 3;
+
+  private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager";
+
+  @BeforeClass
+  public static void setupBookkeeper() throws Exception {
+    bkutil = new BKJMUtil(numBookies);
+    bkutil.start();
+  }
+
+  @AfterClass
+  public static void teardownBookkeeper() throws Exception {
+    bkutil.teardown();
+  }
+
+  /**
+   * Test simple HA failover usecase with BK
+   */
+  @Test
+  public void testFailoverWithBK() throws Exception {
+    Runtime mockRuntime1 = mock(Runtime.class);
+    Runtime mockRuntime2 = mock(Runtime.class);
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
+               BKJMUtil.createJournalURI("/hotfailover").toString());
+      BKJMUtil.addJournalManagerDefinition(conf);
+
+      cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(0)
+        .manageNameDfsSharedDirs(false)
+        .build();
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1);
+      FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2);
+
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      Path p = new Path("/testBKJMfailover");
+
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+
+      fs.mkdirs(p);
+      cluster.shutdownNameNode(0);
+
+      cluster.transitionToActive(1);
+
+      assertTrue(fs.exists(p));
+    } finally {
+      verify(mockRuntime1, times(0)).exit(anyInt());
+      verify(mockRuntime2, times(0)).exit(anyInt());
+
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test HA failover, where BK, as the shared storage, fails.
+   * Once it becomes available again, a standby can come up.
+   * Verify that any write happening after the BK fail is not
+   * available on the standby.
+   */
+  @Test
+  public void testFailoverWithFailingBKCluster() throws Exception {
+    int ensembleSize = numBookies + 1;
+    BookieServer newBookie = bkutil.newBookie();
+    assertEquals("New bookie didn't start",
+                 ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
+
+    BookieServer replacementBookie = null;
+
+    Runtime mockRuntime1 = mock(Runtime.class);
+    Runtime mockRuntime2 = mock(Runtime.class);
+
+    MiniDFSCluster cluster = null;
+
+    try {
+      Configuration conf = new Configuration();
+      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
+               BKJMUtil.createJournalURI("/hotfailoverWithFail").toString());
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+                  ensembleSize);
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+                  ensembleSize);
+      BKJMUtil.addJournalManagerDefinition(conf);
+
+      cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(0)
+        .manageNameDfsSharedDirs(false)
+        .build();
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1);
+      FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2);
+
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      Path p1 = new Path("/testBKJMFailingBKCluster1");
+      Path p2 = new Path("/testBKJMFailingBKCluster2");
+
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+
+      fs.mkdirs(p1);
+      newBookie.shutdown(); // will take down shared storage
+      assertEquals("New bookie didn't stop",
+                   numBookies, bkutil.checkBookiesUp(numBookies, 10));
+
+      // mkdirs will "succeed", but nn have called runtime.exit
+      fs.mkdirs(p2);
+      verify(mockRuntime1, atLeastOnce()).exit(anyInt());
+      verify(mockRuntime2, times(0)).exit(anyInt());
+      cluster.shutdownNameNode(0);
+
+      try {
+        cluster.transitionToActive(1);
+        fail("Shouldn't have been able to transition with bookies down");
+      } catch (ServiceFailedException e) {
+        assertTrue("Wrong exception",
+            e.getMessage().contains("Failed to start active services"));
+      }
+      verify(mockRuntime2, atLeastOnce()).exit(anyInt());
+
+      replacementBookie = bkutil.newBookie();
+      assertEquals("Replacement bookie didn't start",
+                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
+      cluster.transitionToActive(1); // should work fine now
+
+      assertTrue(fs.exists(p1));
+      assertFalse(fs.exists(p2));
+    } finally {
+      newBookie.shutdown();
+      if (replacementBookie != null) {
+        replacementBookie.shutdown();
+      }
+
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test that two namenodes can't become primary at the same
+   * time.
+   */
+  @Test
+  public void testMultiplePrimariesStarted() throws Exception {
+    Runtime mockRuntime1 = mock(Runtime.class);
+    Runtime mockRuntime2 = mock(Runtime.class);
+    Path p1 = new Path("/testBKJMMultiplePrimary");
+
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
+               BKJMUtil.createJournalURI("/hotfailoverMultiple").toString());
+      BKJMUtil.addJournalManagerDefinition(conf);
+
+      cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(0)
+        .manageNameDfsSharedDirs(false)
+        .build();
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      FSEditLogTestUtil.setRuntimeForEditLog(nn1, mockRuntime1);
+      FSEditLogTestUtil.setRuntimeForEditLog(nn2, mockRuntime2);
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      fs.mkdirs(p1);
+      nn1.getRpcServer().rollEditLog();
+      try {
+        cluster.transitionToActive(1);
+        fail("Shouldn't have been able to start two primaries"
+             + " with single shared storage");
+      } catch (ServiceFailedException sfe) {
+        assertTrue("Wrong exception",
+            sfe.getMessage().contains("Failed to start active services"));
+      }
+    } finally {
+      verify(mockRuntime1, times(0)).exit(anyInt());
+      verify(mockRuntime2, atLeastOnce()).exit(anyInt());
+
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java?rev=1342534&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java Fri May 25 08:56:59 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+
+import org.junit.Before;
+import org.junit.After;
+
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+/**
+ * Runs the same tests as TestStandbyCheckpoints, but
+ * using a bookkeeper journal manager as the shared directory
+ */
+public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints {
+  private static BKJMUtil bkutil = null;
+  static int numBookies = 3;
+  static int journalCount = 0;
+
+  @Override
+  @Before
+  public void setupCluster() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
+             BKJMUtil.createJournalURI("/checkpointing" + journalCount++)
+             .toString());
+    BKJMUtil.addJournalManagerDefinition(conf);
+
+    MiniDFSNNTopology topology = new MiniDFSNNTopology()
+      .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
+        .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
+        .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
+
+    cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(topology)
+      .numDataNodes(0)
+      .manageNameDfsSharedDirs(false)
+      .build();
+    cluster.waitActive();
+
+    nn0 = cluster.getNameNode(0);
+    nn1 = cluster.getNameNode(1);
+    fs = HATestUtil.configureFailoverFs(cluster, conf);
+
+    cluster.transitionToActive(0);
+  }
+
+  @BeforeClass
+  public static void startBK() throws Exception {
+    journalCount = 0;
+    bkutil = new BKJMUtil(numBookies);
+    bkutil.start();
+  }
+
+  @AfterClass
+  public static void shutdownBK() throws Exception {
+    if (bkutil != null) {
+      bkutil.teardown();
+    }
+  }
+
+  @Override
+  public void testCheckpointCancellation() throws Exception {
+    // Overriden as the implementation in the superclass assumes that writes
+    // are to a file. This should be fixed at some point
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1342534&r1=1342533&r2=1342534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Fri May 25 08:56:59 2012
@@ -18,55 +18,23 @@
 package org.apache.hadoop.contrib.bkjournal;
 
 import static org.junit.Assert.*;
-
-import java.net.URI;
-import java.util.Collections;
-import java.util.Arrays;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.util.LocalBookKeeper;
-
-import java.io.RandomAccessFile;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.security.SecurityUtil;
 import org.junit.Test;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
 
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.JournalManager;
 
+import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.KeeperException;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.zip.CheckedInputStream;
-import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -75,125 +43,26 @@ public class TestBookKeeperJournalManage
   static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
   
   private static final long DEFAULT_SEGMENT_SIZE = 1000;
-  private static final String zkEnsemble = "localhost:2181";
-  final static private int numBookies = 5;
 
-  private static Thread bkthread;
   protected static Configuration conf = new Configuration();
   private ZooKeeper zkc;
+  private static BKJMUtil bkutil;
+  static int numBookies = 3;
 
-  
-  static int nextPort = 6000; // next port for additionally created bookies
-
-  private static ZooKeeper connectZooKeeper(String ensemble) 
-      throws IOException, KeeperException, InterruptedException {
-    final CountDownLatch latch = new CountDownLatch(1);
-        
-    ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
-        public void process(WatchedEvent event) {
-          if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
-            latch.countDown();
-          }
-        }
-      });
-    if (!latch.await(3, TimeUnit.SECONDS)) {
-      throw new IOException("Zookeeper took too long to connect");
-    }
-    return zkc;
-  }
-
-  private static BookieServer newBookie() throws Exception {
-    int port = nextPort++;
-    ServerConfiguration bookieConf = new ServerConfiguration();
-    bookieConf.setBookiePort(port);
-    File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
-                                      "test");
-    tmpdir.delete();
-    tmpdir.mkdir();
-
-    bookieConf.setZkServers(zkEnsemble);
-    bookieConf.setJournalDirName(tmpdir.getPath());
-    bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
-
-    BookieServer b = new BookieServer(bookieConf);
-    b.start();
-    for (int i = 0; i < 10 && !b.isRunning(); i++) {
-      Thread.sleep(10000);
-    }
-    if (!b.isRunning()) {
-      throw new IOException("Bookie would not start");
-    }
-    return b;
+  @BeforeClass
+  public static void setupBookkeeper() throws Exception {
+    bkutil = new BKJMUtil(numBookies);
+    bkutil.start();
   }
 
-  /**
-   * Check that a number of bookies are available
-   * @param count number of bookies required
-   * @param timeout number of seconds to wait for bookies to start
-   * @throws IOException if bookies are not started by the time the timeout hits
-   */
-  private static int checkBookiesUp(int count, int timeout) throws Exception {
-    ZooKeeper zkc = connectZooKeeper(zkEnsemble);
-    try {
-      boolean up = false;
-      int mostRecentSize = 0;
-      for (int i = 0; i < timeout; i++) {
-        try {
-          List<String> children = zkc.getChildren("/ledgers/available",
-                                                  false);
-          mostRecentSize = children.size();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found " + mostRecentSize + " bookies up, "
-                      + "waiting for " + count);
-            if (LOG.isTraceEnabled()) {
-              for (String child : children) {
-                LOG.trace(" server: " + child);
-              }
-            }
-          }
-          if (mostRecentSize == count) {
-            up = true;
-            break;
-          }
-        } catch (KeeperException e) {
-          // ignore
-        }
-        Thread.sleep(1000);
-      }
-      return mostRecentSize;
-    } finally {
-      zkc.close();
-    }
+  @AfterClass
+  public static void teardownBookkeeper() throws Exception {
+    bkutil.teardown();
   }
 
-  @BeforeClass
-  public static void setupBookkeeper() throws Exception {
-    bkthread = new Thread() {
-        public void run() {
-          try {
-            String[] args = new String[1];
-            args[0] = String.valueOf(numBookies);
-            LOG.info("Starting bk");
-            LocalBookKeeper.main(args);
-          } catch (InterruptedException e) {
-            // go away quietly
-          } catch (Exception e) {
-            LOG.error("Error starting local bk", e);
-          }
-        }
-      };
-    bkthread.start();
-    
-    if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
-      throw new Exception("Error starting zookeeper/bookkeeper");
-    }
-    assertEquals("Not all bookies started", 
-                 numBookies, checkBookiesUp(numBookies, 10));
-  }
-  
   @Before
   public void setup() throws Exception {
-    zkc = connectZooKeeper(zkEnsemble);
+    zkc = BKJMUtil.connectZooKeeper();
   }
 
   @After
@@ -201,18 +70,10 @@ public class TestBookKeeperJournalManage
     zkc.close();
   }
 
-  @AfterClass
-  public static void teardownBookkeeper() throws Exception {
-    if (bkthread != null) {
-      bkthread.interrupt();
-      bkthread.join();
-    }
-  }
-
   @Test
   public void testSimpleWrite() throws Exception {
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
+        BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
     long txid = 1;
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
@@ -231,8 +92,8 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testNumberOfTransactions() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
     long txid = 1;
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
@@ -249,8 +110,8 @@ public class TestBookKeeperJournalManage
 
   @Test 
   public void testNumberOfTransactionsWithGaps() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-gaps"));
     long txid = 1;
     for (long i = 0; i < 3; i++) {
       long start = txid;
@@ -262,9 +123,11 @@ public class TestBookKeeperJournalManage
       }
       out.close();
       bkjm.finalizeLogSegment(start, txid-1);
-      assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
+      assertNotNull(
+          zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
     }
-    zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
+    zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1,
+                                         DEFAULT_SEGMENT_SIZE*2), -1);
     
     long numTrans = bkjm.getNumberOfTransactions(1, true);
     assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
@@ -282,8 +145,8 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"));
     long txid = 1;
     for (long i = 0; i < 3; i++) {
       long start = txid;
@@ -296,7 +159,8 @@ public class TestBookKeeperJournalManage
       
       out.close();
       bkjm.finalizeLogSegment(start, (txid-1));
-      assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
+      assertNotNull(
+          zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
     }
     long start = txid;
     EditLogOutputStream out = bkjm.startLogSegment(start);
@@ -320,8 +184,8 @@ public class TestBookKeeperJournalManage
    */
   @Test
   public void testWriteRestartFrom1() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"));
     long txid = 1;
     long start = txid;
     EditLogOutputStream out = bkjm.startLogSegment(txid);
@@ -375,10 +239,10 @@ public class TestBookKeeperJournalManage
   @Test
   public void testTwoWriters() throws Exception {
     long start = 1;
-    BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
-    BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
+    BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
+    BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
     
     EditLogOutputStream out1 = bkjm1.startLogSegment(start);
     try {
@@ -391,8 +255,8 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testSimpleRead() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
     long txid = 1;
     final long numTransactions = 10000;
     EditLogOutputStream out = bkjm.startLogSegment(1);
@@ -416,8 +280,8 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testSimpleRecovery() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
     EditLogOutputStream out = bkjm.startLogSegment(1);
     long txid = 1;
     for (long i = 1 ; i <= 100; i++) {
@@ -448,13 +312,13 @@ public class TestBookKeeperJournalManage
    */
   @Test
   public void testAllBookieFailure() throws Exception {
-    BookieServer bookieToFail = newBookie();
+    BookieServer bookieToFail = bkutil.newBookie();
     BookieServer replacementBookie = null;
 
     try {
       int ensembleSize = numBookies + 1;
       assertEquals("New bookie didn't start",
-                   ensembleSize, checkBookiesUp(ensembleSize, 10));
+                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
 
       // ensure that the journal manager has to use all bookies,
       // so that a failure will fail the journal manager
@@ -465,8 +329,7 @@ public class TestBookKeeperJournalManage
                   ensembleSize);
       long txid = 1;
       BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-          URI.create("bookkeeper://" + zkEnsemble
-                     + "/hdfsjournal-allbookiefailure"));
+          BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"));
       EditLogOutputStream out = bkjm.startLogSegment(txid);
 
       for (long i = 1 ; i <= 3; i++) {
@@ -478,7 +341,7 @@ public class TestBookKeeperJournalManage
       out.flush();
       bookieToFail.shutdown();
       assertEquals("New bookie didn't die",
-                   numBookies, checkBookiesUp(numBookies, 10));
+                   numBookies, bkutil.checkBookiesUp(numBookies, 10));
 
       try {
         for (long i = 1 ; i <= 3; i++) {
@@ -494,10 +357,10 @@ public class TestBookKeeperJournalManage
         assertTrue("Invalid exception message",
                    ioe.getMessage().contains("Failed to write to bookkeeper"));
       }
-      replacementBookie = newBookie();
+      replacementBookie = bkutil.newBookie();
 
       assertEquals("New bookie didn't start",
-                   numBookies+1, checkBookiesUp(numBookies+1, 10));
+                   numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
       out = bkjm.startLogSegment(txid);
       for (long i = 1 ; i <= 3; i++) {
         FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -517,7 +380,7 @@ public class TestBookKeeperJournalManage
       }
       bookieToFail.shutdown();
 
-      if (checkBookiesUp(numBookies, 30) != numBookies) {
+      if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
         LOG.warn("Not all bookies from this test shut down, expect errors");
       }
     }
@@ -530,13 +393,13 @@ public class TestBookKeeperJournalManage
    */
   @Test
   public void testOneBookieFailure() throws Exception {
-    BookieServer bookieToFail = newBookie();
+    BookieServer bookieToFail = bkutil.newBookie();
     BookieServer replacementBookie = null;
 
     try {
       int ensembleSize = numBookies + 1;
       assertEquals("New bookie didn't start",
-                   ensembleSize, checkBookiesUp(ensembleSize, 10));
+                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
 
       // ensure that the journal manager has to use all bookies,
       // so that a failure will fail the journal manager
@@ -547,8 +410,7 @@ public class TestBookKeeperJournalManage
                   ensembleSize);
       long txid = 1;
       BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-          URI.create("bookkeeper://" + zkEnsemble
-                     + "/hdfsjournal-onebookiefailure"));
+          BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"));
       EditLogOutputStream out = bkjm.startLogSegment(txid);
       for (long i = 1 ; i <= 3; i++) {
         FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -558,12 +420,12 @@ public class TestBookKeeperJournalManage
       out.setReadyToFlush();
       out.flush();
 
-      replacementBookie = newBookie();
+      replacementBookie = bkutil.newBookie();
       assertEquals("replacement bookie didn't start",
-                   ensembleSize+1, checkBookiesUp(ensembleSize+1, 10));
+                   ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
       bookieToFail.shutdown();
       assertEquals("New bookie didn't die",
-                   ensembleSize, checkBookiesUp(ensembleSize, 10));
+                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
 
       for (long i = 1 ; i <= 3; i++) {
         FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -581,10 +443,10 @@ public class TestBookKeeperJournalManage
       }
       bookieToFail.shutdown();
 
-      if (checkBookiesUp(numBookies, 30) != numBookies) {
+      if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
         LOG.warn("Not all bookies from this test shut down, expect errors");
       }
     }
   }
 
-}
\ No newline at end of file
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java?rev=1342534&r1=1342533&r2=1342534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java Fri May 25 08:56:59 2012
@@ -36,4 +36,9 @@ public class FSEditLogTestUtil {
     FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
     return (validation.getEndTxId() - in.getFirstTxId()) + 1;
   }
+
+  public static void setRuntimeForEditLog(NameNode nn, Runtime rt) {
+    nn.setRuntimeForTesting(rt);
+    nn.getFSImage().getEditLog().setRuntimeForTesting(rt);
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1342534&r1=1342533&r2=1342534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri May 25 08:56:59 2012
@@ -231,6 +231,10 @@ public class FSEditLog  {
         DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
 
     journalSet = new JournalSet(minimumRedundantJournals);
+    // set runtime so we can test starting with a faulty or unavailable
+    // shared directory
+    this.journalSet.setRuntimeForTesting(runtime);
+
     for (URI u : dirs) {
       boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
           .contains(u);
@@ -842,7 +846,7 @@ public class FSEditLog  {
    * Used only by unit tests.
    */
   @VisibleForTesting
-  synchronized void setRuntimeForTesting(Runtime runtime) {
+  synchronized public void setRuntimeForTesting(Runtime runtime) {
     this.runtime = runtime;
     this.journalSet.setRuntimeForTesting(runtime);
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1342534&r1=1342533&r2=1342534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri May 25 08:56:59 2012
@@ -133,6 +133,7 @@ public class MiniDFSCluster {
     private int numDataNodes = 1;
     private boolean format = true;
     private boolean manageNameDfsDirs = true;
+    private boolean manageNameDfsSharedDirs = true;
     private boolean manageDataDfsDirs = true;
     private StartupOption option = null;
     private String[] racks = null; 
@@ -190,6 +191,14 @@ public class MiniDFSCluster {
     /**
      * Default: true
      */
+    public Builder manageNameDfsSharedDirs(boolean val) {
+      this.manageNameDfsSharedDirs = val;
+      return this;
+    }
+
+    /**
+     * Default: true
+     */
     public Builder manageDataDfsDirs(boolean val) {
       this.manageDataDfsDirs = val;
       return this;
@@ -288,6 +297,7 @@ public class MiniDFSCluster {
                        builder.numDataNodes,
                        builder.format,
                        builder.manageNameDfsDirs,
+                       builder.manageNameDfsSharedDirs,
                        builder.manageDataDfsDirs,
                        builder.option,
                        builder.racks,
@@ -527,7 +537,7 @@ public class MiniDFSCluster {
                         long[] simulatedCapacities) throws IOException {
     this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
     initMiniDFSCluster(conf, numDataNodes, format,
-        manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
+        manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts,
         simulatedCapacities, null, true, false,
         MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0));
   }
@@ -535,7 +545,8 @@ public class MiniDFSCluster {
   private void initMiniDFSCluster(
       Configuration conf,
       int numDataNodes, boolean format, boolean manageNameDfsDirs,
-      boolean manageDataDfsDirs, StartupOption operation, String[] racks,
+      boolean manageNameDfsSharedDirs, boolean manageDataDfsDirs,
+      StartupOption operation, String[] racks,
       String[] hosts, long[] simulatedCapacities, String clusterId,
       boolean waitSafeMode, boolean setupHostsFile,
       MiniDFSNNTopology nnTopology)
@@ -574,7 +585,8 @@ public class MiniDFSCluster {
     
     federation = nnTopology.isFederated();
     createNameNodesAndSetConf(
-        nnTopology, manageNameDfsDirs, format, operation, clusterId, conf);
+        nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
+        format, operation, clusterId, conf);
     
     if (format) {
       if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
@@ -595,8 +607,8 @@ public class MiniDFSCluster {
   }
   
   private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
-      boolean manageNameDfsDirs, boolean format, StartupOption operation,
-      String clusterId,
+      boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs,
+      boolean format, StartupOption operation, String clusterId,
       Configuration conf) throws IOException {
     Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
         "empty NN topology: no namenodes specified!");
@@ -641,7 +653,7 @@ public class MiniDFSCluster {
       if (nnIds.size() > 1) {
         conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()),
             Joiner.on(",").join(nnIds));
-        if (manageNameDfsDirs) {
+        if (manageNameDfsSharedDirs) {
           URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1); 
           conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
         }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java?rev=1342534&r1=1342533&r2=1342534&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java Fri May 25 08:56:59 2012
@@ -54,9 +54,9 @@ import com.google.common.collect.Lists;
 
 public class TestStandbyCheckpoints {
   private static final int NUM_DIRS_IN_LOG = 200000;
-  private MiniDFSCluster cluster;
-  private NameNode nn0, nn1;
-  private FileSystem fs;
+  protected MiniDFSCluster cluster;
+  protected NameNode nn0, nn1;
+  protected FileSystem fs;
 
   @SuppressWarnings("rawtypes")
   @Before



Mime
View raw message