hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1346682 [2/9] - in /hadoop/common/branches/HDFS-3092/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/contrib/bkjournal/ ha...
Date Wed, 06 Jun 2012 00:18:04 GMT
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Wed Jun  6 00:17:38 2012
@@ -18,53 +18,31 @@
 package org.apache.hadoop.contrib.bkjournal;
 
 import static org.junit.Assert.*;
-
-import java.net.URI;
-import java.util.Collections;
-import java.util.Arrays;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.util.LocalBookKeeper;
-
-import java.io.RandomAccessFile;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.security.SecurityUtil;
+import static org.mockito.Mockito.spy;
 import org.junit.Test;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
 
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.JournalManager;
 
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.zip.CheckedInputStream;
-import java.util.zip.Checksum;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -73,79 +51,26 @@ public class TestBookKeeperJournalManage
   static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
   
   private static final long DEFAULT_SEGMENT_SIZE = 1000;
-  private static final String zkEnsemble = "localhost:2181";
 
-  private static Thread bkthread;
   protected static Configuration conf = new Configuration();
   private ZooKeeper zkc;
-
-  private static ZooKeeper connectZooKeeper(String ensemble) 
-      throws IOException, KeeperException, InterruptedException {
-    final CountDownLatch latch = new CountDownLatch(1);
-        
-    ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
-        public void process(WatchedEvent event) {
-          if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
-            latch.countDown();
-          }
-        }
-      });
-    if (!latch.await(3, TimeUnit.SECONDS)) {
-      throw new IOException("Zookeeper took too long to connect");
-    }
-    return zkc;
-  }
+  private static BKJMUtil bkutil;
+  static int numBookies = 3;
 
   @BeforeClass
   public static void setupBookkeeper() throws Exception {
-    final int numBookies = 5;
-    bkthread = new Thread() {
-        public void run() {
-          try {
-            String[] args = new String[1];
-            args[0] = String.valueOf(numBookies);
-            LOG.info("Starting bk");
-            LocalBookKeeper.main(args);
-          } catch (InterruptedException e) {
-            // go away quietly
-          } catch (Exception e) {
-            LOG.error("Error starting local bk", e);
-          }
-        }
-      };
-    bkthread.start();
-    
-    if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
-      throw new Exception("Error starting zookeeper/bookkeeper");
-    }
+    bkutil = new BKJMUtil(numBookies);
+    bkutil.start();
+  }
 
-    ZooKeeper zkc = connectZooKeeper(zkEnsemble);
-    try {
-      boolean up = false;
-      for (int i = 0; i < 10; i++) {
-        try {
-          List<String> children = zkc.getChildren("/ledgers/available", 
-                                                  false);
-          if (children.size() == numBookies) {
-            up = true;
-            break;
-          }
-        } catch (KeeperException e) {
-          // ignore
-        }
-        Thread.sleep(1000);
-      }
-      if (!up) {
-        throw new IOException("Not enough bookies started");
-      }
-    } finally {
-      zkc.close();
-    }
+  @AfterClass
+  public static void teardownBookkeeper() throws Exception {
+    bkutil.teardown();
   }
-  
+
   @Before
   public void setup() throws Exception {
-    zkc = connectZooKeeper(zkEnsemble);
+    zkc = BKJMUtil.connectZooKeeper();
   }
 
   @After
@@ -153,19 +78,10 @@ public class TestBookKeeperJournalManage
     zkc.close();
   }
 
-  @AfterClass
-  public static void teardownBookkeeper() throws Exception {
-    if (bkthread != null) {
-      bkthread.interrupt();
-      bkthread.join();
-    }
-  }
-
   @Test
   public void testSimpleWrite() throws Exception {
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
-    long txid = 1;
+        BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
       FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -178,14 +94,13 @@ public class TestBookKeeperJournalManage
     String zkpath = bkjm.finalizedLedgerZNode(1, 100);
     
     assertNotNull(zkc.exists(zkpath, false));
-    assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+    assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
   }
 
   @Test
   public void testNumberOfTransactions() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
-    long txid = 1;
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
       FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -201,8 +116,8 @@ public class TestBookKeeperJournalManage
 
   @Test 
   public void testNumberOfTransactionsWithGaps() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-gaps"));
     long txid = 1;
     for (long i = 0; i < 3; i++) {
       long start = txid;
@@ -214,9 +129,11 @@ public class TestBookKeeperJournalManage
       }
       out.close();
       bkjm.finalizeLogSegment(start, txid-1);
-      assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
+      assertNotNull(
+          zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
     }
-    zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
+    zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1,
+                                         DEFAULT_SEGMENT_SIZE*2), -1);
     
     long numTrans = bkjm.getNumberOfTransactions(1, true);
     assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
@@ -234,8 +151,8 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"));
     long txid = 1;
     for (long i = 0; i < 3; i++) {
       long start = txid;
@@ -248,7 +165,8 @@ public class TestBookKeeperJournalManage
       
       out.close();
       bkjm.finalizeLogSegment(start, (txid-1));
-      assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
+      assertNotNull(
+          zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
     }
     long start = txid;
     EditLogOutputStream out = bkjm.startLogSegment(start);
@@ -272,8 +190,8 @@ public class TestBookKeeperJournalManage
    */
   @Test
   public void testWriteRestartFrom1() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"));
     long txid = 1;
     long start = txid;
     EditLogOutputStream out = bkjm.startLogSegment(txid);
@@ -327,25 +245,26 @@ public class TestBookKeeperJournalManage
   @Test
   public void testTwoWriters() throws Exception {
     long start = 1;
-    BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
-    BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
+    BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
+    BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
     
     EditLogOutputStream out1 = bkjm1.startLogSegment(start);
     try {
-      EditLogOutputStream out2 = bkjm2.startLogSegment(start);
+      bkjm2.startLogSegment(start);
       fail("Shouldn't have been able to open the second writer");
     } catch (IOException ioe) {
       LOG.info("Caught exception as expected", ioe);
+    }finally{
+      out1.close();
     }
   }
 
   @Test
   public void testSimpleRead() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
-    long txid = 1;
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
     final long numTransactions = 10000;
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= numTransactions; i++) {
@@ -368,10 +287,9 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testSimpleRecovery() throws Exception {
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, 
-        URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
     EditLogOutputStream out = bkjm.startLogSegment(1);
-    long txid = 1;
     for (long i = 1 ; i <= 100; i++) {
       FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
       op.setTransactionId(i);
@@ -385,11 +303,372 @@ public class TestBookKeeperJournalManage
 
 
     assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
-    assertNotNull(zkc.exists(bkjm.inprogressZNode(), false));
+    assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false));
 
     bkjm.recoverUnfinalizedSegments();
 
     assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
-    assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+    assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
+  }
+
+  /**
+   * Test that if enough bookies fail to prevent an ensemble,
+   * writes the bookkeeper will fail. Test that when once again
+   * an ensemble is available, it can continue to write.
+   */
+  @Test
+  public void testAllBookieFailure() throws Exception {
+    BookieServer bookieToFail = bkutil.newBookie();
+    BookieServer replacementBookie = null;
+
+    try {
+      int ensembleSize = numBookies + 1;
+      assertEquals("New bookie didn't start",
+                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
+
+      // ensure that the journal manager has to use all bookies,
+      // so that a failure will fail the journal manager
+      Configuration conf = new Configuration();
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+                  ensembleSize);
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+                  ensembleSize);
+      long txid = 1;
+      BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+          BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"));
+      EditLogOutputStream out = bkjm.startLogSegment(txid);
+
+      for (long i = 1 ; i <= 3; i++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+      out.setReadyToFlush();
+      out.flush();
+      bookieToFail.shutdown();
+      assertEquals("New bookie didn't die",
+                   numBookies, bkutil.checkBookiesUp(numBookies, 10));
+
+      try {
+        for (long i = 1 ; i <= 3; i++) {
+          FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+          op.setTransactionId(txid++);
+          out.write(op);
+        }
+        out.setReadyToFlush();
+        out.flush();
+        fail("should not get to this stage");
+      } catch (IOException ioe) {
+        LOG.debug("Error writing to bookkeeper", ioe);
+        assertTrue("Invalid exception message",
+                   ioe.getMessage().contains("Failed to write to bookkeeper"));
+      }
+      replacementBookie = bkutil.newBookie();
+
+      assertEquals("New bookie didn't start",
+                   numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
+      bkjm.recoverUnfinalizedSegments();
+      out = bkjm.startLogSegment(txid);
+      for (long i = 1 ; i <= 3; i++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+
+      out.setReadyToFlush();
+      out.flush();
+
+    } catch (Exception e) {
+      LOG.error("Exception in test", e);
+      throw e;
+    } finally {
+      if (replacementBookie != null) {
+        replacementBookie.shutdown();
+      }
+      bookieToFail.shutdown();
+
+      if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
+        LOG.warn("Not all bookies from this test shut down, expect errors");
+      }
+    }
+  }
+
+  /**
+   * Test that a BookKeeper JM can continue to work across the
+   * failure of a bookie. This should be handled transparently
+   * by bookkeeper.
+   */
+  @Test
+  public void testOneBookieFailure() throws Exception {
+    BookieServer bookieToFail = bkutil.newBookie();
+    BookieServer replacementBookie = null;
+
+    try {
+      int ensembleSize = numBookies + 1;
+      assertEquals("New bookie didn't start",
+                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
+
+      // ensure that the journal manager has to use all bookies,
+      // so that a failure will fail the journal manager
+      Configuration conf = new Configuration();
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+                  ensembleSize);
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+                  ensembleSize);
+      long txid = 1;
+      BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+          BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"));
+      EditLogOutputStream out = bkjm.startLogSegment(txid);
+      for (long i = 1 ; i <= 3; i++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+      out.setReadyToFlush();
+      out.flush();
+
+      replacementBookie = bkutil.newBookie();
+      assertEquals("replacement bookie didn't start",
+                   ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
+      bookieToFail.shutdown();
+      assertEquals("New bookie didn't die",
+                   ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
+
+      for (long i = 1 ; i <= 3; i++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+      out.setReadyToFlush();
+      out.flush();
+    } catch (Exception e) {
+      LOG.error("Exception in test", e);
+      throw e;
+    } finally {
+      if (replacementBookie != null) {
+        replacementBookie.shutdown();
+      }
+      bookieToFail.shutdown();
+
+      if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
+        LOG.warn("Not all bookies from this test shut down, expect errors");
+      }
+    }
+  }
+  
+  /**
+   * If a journal manager has an empty inprogress node, ensure that we throw an
+   * error, as this should not be possible, and some third party has corrupted
+   * the zookeeper state
+   */
+  @Test
+  public void testEmptyInprogressNode() throws Exception {
+    URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+    EditLogOutputStream out = bkjm.startLogSegment(1);
+    for (long i = 1; i <= 100; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(1, 100);
+
+    out = bkjm.startLogSegment(101);
+    out.close();
+    bkjm.close();
+    String inprogressZNode = bkjm.inprogressZNode(101);
+    zkc.setData(inprogressZNode, new byte[0], -1);
+
+    bkjm = new BookKeeperJournalManager(conf, uri);
+    try {
+      bkjm.recoverUnfinalizedSegments();
+      fail("Should have failed. There should be no way of creating"
+          + " an empty inprogess znode");
+    } catch (IOException e) {
+      // correct behaviour
+      assertTrue("Exception different than expected", e.getMessage().contains(
+          "Invalid ledger entry,"));
+    } finally {
+      bkjm.close();
+    }
+  }
+
+  /**
+   * If a journal manager has an corrupt inprogress node, ensure that we throw
+   * an error, as this should not be possible, and some third party has
+   * corrupted the zookeeper state
+   */
+  @Test
+  public void testCorruptInprogressNode() throws Exception {
+    URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+    EditLogOutputStream out = bkjm.startLogSegment(1);
+    for (long i = 1; i <= 100; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(1, 100);
+
+    out = bkjm.startLogSegment(101);
+    out.close();
+    bkjm.close();
+
+    String inprogressZNode = bkjm.inprogressZNode(101);
+    zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
+
+    bkjm = new BookKeeperJournalManager(conf, uri);
+    try {
+      bkjm.recoverUnfinalizedSegments();
+      fail("Should have failed. There should be no way of creating"
+          + " an empty inprogess znode");
+    } catch (IOException e) {
+      // correct behaviour
+      assertTrue("Exception different than expected", e.getMessage().contains(
+          "Invalid ledger entry,"));
+
+    } finally {
+      bkjm.close();
+    }
+  }
+
+  /**
+   * Cases can occur where we create a segment but crash before we even have the
+   * chance to write the START_SEGMENT op. If this occurs we should warn, but
+   * load as normal
+   */
+  @Test
+  public void testEmptyInprogressLedger() throws Exception {
+    URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+    EditLogOutputStream out = bkjm.startLogSegment(1);
+    for (long i = 1; i <= 100; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(1, 100);
+
+    out = bkjm.startLogSegment(101);
+    out.close();
+    bkjm.close();
+
+    bkjm = new BookKeeperJournalManager(conf, uri);
+    bkjm.recoverUnfinalizedSegments();
+    out = bkjm.startLogSegment(101);
+    for (long i = 1; i <= 100; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(101, 200);
+
+    bkjm.close();
+  }
+
+  /**
+   * Test that if we fail between finalizing an inprogress and deleting the
+   * corresponding inprogress znode.
+   */
+  @Test
+  public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
+    URI uri = BKJMUtil
+        .createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+    EditLogOutputStream out = bkjm.startLogSegment(1);
+    for (long i = 1; i <= 100; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.close();
+
+    String inprogressZNode = bkjm.inprogressZNode(1);
+    String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100);
+    assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode,
+        null));
+    assertNull("finalized znode exists", zkc.exists(finalizedZNode, null));
+
+    byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
+
+    // finalize
+    bkjm = new BookKeeperJournalManager(conf, uri);
+    bkjm.recoverUnfinalizedSegments();
+    bkjm.close();
+
+    assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null));
+    assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode,
+        null));
+
+    zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    // should work fine
+    bkjm = new BookKeeperJournalManager(conf, uri);
+    bkjm.recoverUnfinalizedSegments();
+    bkjm.close();
+  }
+
+  /**
+   * Tests that the edit log file meta data reading from ZooKeeper should be
+   * able to handle the NoNodeException. bkjm.getInputStream(fromTxId,
+   * inProgressOk) should suppress the NoNodeException and continue. HDFS-3441.
+   */
+  @Test
+  public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
+    URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+    try {
+      // start new inprogress log segment with txid=1
+      // and write transactions till txid=50
+      String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50);
+
+      // start new inprogress log segment with txid=51
+      // and write transactions till txid=100
+      String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100);
+
+      // read the metadata from ZK. Here simulating the situation
+      // when reading,the edit log metadata can be removed by purger thread.
+      ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper());
+      bkjm.setZooKeeper(zkspy);
+      Mockito.doThrow(
+          new KeeperException.NoNodeException(zkpath2 + " doesn't exists"))
+          .when(zkspy).getData(zkpath2, false, null);
+
+      List<EditLogLedgerMetadata> ledgerList = bkjm.getLedgerList(false);
+      assertEquals("List contains the metadata of non exists path.", 1,
+          ledgerList.size());
+      assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1,
+          ledgerList.get(0).getZkPath());
+    } finally {
+      bkjm.close();
+    }
+  }
+
+  private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
+      int startTxid, int endTxid) throws IOException, KeeperException,
+      InterruptedException {
+    EditLogOutputStream out = bkjm.startLogSegment(startTxid);
+    for (long i = startTxid; i <= endTxid; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    // finalize the inprogress_1 log segment.
+    bkjm.finalizeLogSegment(startTxid, endTxid);
+    String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid);
+    assertNotNull(zkc.exists(zkpath1, false));
+    assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false));
+    return zkpath1;
   }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java Wed Jun  6 00:17:38 2012
@@ -34,6 +34,11 @@ public class FSEditLogTestUtil {
   public static long countTransactionsInStream(EditLogInputStream in) 
       throws IOException {
     FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
-    return validation.getNumTransactions();
+    return (validation.getEndTxId() - in.getFirstTxId()) + 1;
+  }
+
+  public static void setRuntimeForEditLog(NameNode nn, Runtime rt) {
+    nn.setRuntimeForTesting(rt);
+    nn.getFSImage().getEditLog().setRuntimeForTesting(rt);
   }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml Wed Jun  6 00:17:38 2012
@@ -14,7 +14,10 @@
 
 
 -->
-<project>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.hadoop</groupId>

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c Wed Jun  6 00:17:38 2012
@@ -37,7 +37,7 @@ int dfs_truncate(const char *path, off_t
   assert(dfs);
 
   if (size != 0) {
-    return -ENOTSUP;
+    return 0;
   }
 
   int ret = dfs_unlink(path);

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Wed Jun  6 00:17:38 2012
@@ -30,6 +30,7 @@ function print_usage(){
   echo "  namenode -format     format the DFS filesystem"
   echo "  secondarynamenode    run the DFS secondary namenode"
   echo "  namenode             run the DFS namenode"
+  echo "  zkfc                 run the ZK Failover Controller daemon"
   echo "  datanode             run a DFS datanode"
   echo "  dfsadmin             run a DFS admin client"
   echo "  haadmin              run a DFS HA admin client"
@@ -56,21 +57,29 @@ shift
 
 # Determine if we're starting a secure datanode, and if so, redefine appropriate variables
 if [ "$COMMAND" == "datanode" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then
-  if [ -n "$HADOOP_SECURE_DN_PID_DIR" ]; then
-    HADOOP_PID_DIR=$HADOOP_SECURE_DN_PID_DIR
-  fi
-
-  if [ -n "$HADOOP_SECURE_DN_LOG_DIR" ]; then
-    HADOOP_LOG_DIR=$HADOOP_SECURE_DN_LOG_DIR
+  if [ -n "$JSVC_HOME" ]; then
+    if [ -n "$HADOOP_SECURE_DN_PID_DIR" ]; then
+      HADOOP_PID_DIR=$HADOOP_SECURE_DN_PID_DIR
+    fi
+  
+    if [ -n "$HADOOP_SECURE_DN_LOG_DIR" ]; then
+      HADOOP_LOG_DIR=$HADOOP_SECURE_DN_LOG_DIR
+    fi
+   
+    HADOOP_IDENT_STRING=$HADOOP_SECURE_DN_USER
+    starting_secure_dn="true"
+  else
+    echo "It looks like you're trying to start a secure DN, but \$JSVC_HOME"\
+      "isn't set. Falling back to starting insecure DN."
   fi
- 
-  HADOOP_IDENT_STRING=$HADOOP_SECURE_DN_USER
-  starting_secure_dn="true"
 fi
 
 if [ "$COMMAND" = "namenode" ] ; then
   CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
+elif [ "$COMMAND" = "zkfc" ] ; then
+  CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
+  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
 elif [ "$COMMAND" = "secondarynamenode" ] ; then
   CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS"
@@ -125,12 +134,12 @@ if [ "$starting_secure_dn" = "true" ]; t
   if [ "$HADOOP_PID_DIR" = "" ]; then
     HADOOP_SECURE_DN_PID="/tmp/hadoop_secure_dn.pid"
   else
-   HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid"
+    HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid"
   fi
 
   JSVC=$JSVC_HOME/jsvc
   if [ ! -f $JSVC ]; then
-    echo "JSVC_HOME is not set correctly so jsvc can not be found. Jsvc is required to run secure datanodes. "
+    echo "JSVC_HOME is not set correctly so jsvc cannot be found. Jsvc is required to run secure datanodes. "
     echo "Please download and install jsvc from http://archive.apache.org/dist/commons/daemon/binaries/ "\
       "and set JSVC_HOME to the directory containing the jsvc binary."
     exit

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh Wed Jun  6 00:17:38 2012
@@ -85,4 +85,15 @@ if [ -n "$SECONDARY_NAMENODES" ]; then
       --script "$bin/hdfs" start secondarynamenode
 fi
 
+#---------------------------------------------------------
+# ZK Failover controllers, if auto-HA is enabled
+AUTOHA_ENABLED=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.ha.automatic-failover.enabled)
+if [ "$(echo "$AUTOHA_ENABLED" | tr A-Z a-z)" = "true" ]; then
+  echo "Starting ZK Failover Controllers on NN hosts [$NAMENODES]"
+  "$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
+    --config "$HADOOP_CONF_DIR" \
+    --hostnames "$NAMENODES" \
+    --script "$bin/hdfs" start zkfc
+fi
+
 # eof

Propchange: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1306184-1342109
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1337003-1346681

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Jun  6 00:17:38 2012
@@ -107,6 +107,8 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000;
   public static final String  DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
   public static final int     DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
+  public static final String  DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier";
+  public static final int     DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT = 4;
   public static final String  DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource";
   public static final String  DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
   public static final String  DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
@@ -334,8 +336,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
   public static final int     DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
   
-  public static final String  DFS_FEDERATION_NAMESERVICES = "dfs.federation.nameservices";
-  public static final String  DFS_FEDERATION_NAMESERVICE_ID = "dfs.federation.nameservice.id";
+  public static final String  DFS_NAMESERVICES = "dfs.nameservices";
+  public static final String  DFS_NAMESERVICE_ID = "dfs.nameservice.id";
   public static final String  DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval";
   public static final int     DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000;
   public static final String  DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved";
@@ -358,4 +360,8 @@ public class DFSConfigKeys extends Commo
   public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
   public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
   public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";
+  public static final String DFS_HA_AUTO_FAILOVER_ENABLED_KEY = "dfs.ha.automatic-failover.enabled";
+  public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
+  public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
+  public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Jun  6 00:17:38 2012
@@ -129,11 +129,13 @@ public class DFSOutputStream extends FSO
   private long initialFileSize = 0; // at time of file open
   private Progressable progress;
   private final short blockReplication; // replication factor of file
+  private boolean shouldSyncBlock = false; // force blocks to disk upon close
   
   private class Packet {
     long    seqno;               // sequencenumber of buffer in block
     long    offsetInBlock;       // offset in block
-    boolean lastPacketInBlock;   // is this the last packet in block?
+    private boolean lastPacketInBlock;   // is this the last packet in block?
+    boolean syncBlock;          // this packet forces the current block to disk
     int     numChunks;           // number of chunks currently in packet
     int     maxChunks;           // max chunks in packet
 
@@ -245,7 +247,7 @@ public class DFSOutputStream extends FSO
       buffer.mark();
 
       PacketHeader header = new PacketHeader(
-        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
+        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
       header.putInBuffer(buffer);
       
       buffer.reset();
@@ -507,8 +509,15 @@ public class DFSOutputStream extends FSO
           }
 
           // write out data to remote datanode
-          blockStream.write(buf.array(), buf.position(), buf.remaining());
-          blockStream.flush();
+          try {            
+            blockStream.write(buf.array(), buf.position(), buf.remaining());
+            blockStream.flush();   
+          } catch (IOException e) {
+            // HDFS-3398 treat primary DN is down since client is unable to 
+            // write to primary DN 
+            errorIndex = 0;
+            throw e;
+          }
           lastPacket = System.currentTimeMillis();
           
           if (one.isHeartbeatPacket()) {  //heartbeat packet
@@ -965,6 +974,7 @@ public class DFSOutputStream extends FSO
       DatanodeInfo[] nodes = null;
       int count = dfsClient.getConf().nBlockWriteRetry;
       boolean success = false;
+      ExtendedBlock oldBlock = block;
       do {
         hasError = false;
         lastException = null;
@@ -972,9 +982,11 @@ public class DFSOutputStream extends FSO
         success = false;
 
         long startTime = System.currentTimeMillis();
-        DatanodeInfo[] w = excludedNodes.toArray(
+        DatanodeInfo[] excluded = excludedNodes.toArray(
             new DatanodeInfo[excludedNodes.size()]);
-        lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
+        block = oldBlock;
+        lb = locateFollowingBlock(startTime,
+            excluded.length > 0 ? excluded : null);
         block = lb.getBlock();
         block.setNumBytes(0);
         accessToken = lb.getBlockToken();
@@ -1239,6 +1251,7 @@ public class DFSOutputStream extends FSO
       long blockSize, Progressable progress, int buffersize,
       DataChecksum checksum) throws IOException {
     this(dfsClient, src, blockSize, progress, checksum, replication);
+    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
     computePacketChunkSize(dfsClient.getConf().writePacketSize,
         checksum.getBytesPerChecksum());
@@ -1421,6 +1434,7 @@ public class DFSOutputStream extends FSO
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
+        currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
         bytesCurBlock = 0;
         lastFlushOffset = 0;
@@ -1440,6 +1454,24 @@ public class DFSOutputStream extends FSO
    */
   @Override
   public void hflush() throws IOException {
+    flushOrSync(false);
+  }
+
+  /**
+   * The expected semantics is all data have flushed out to all replicas 
+   * and all replicas have done posix fsync equivalent - ie the OS has 
+   * flushed it to the disk device (but the disk may have it in its cache).
+   * 
+   * Note that only the current block is flushed to the disk device.
+   * To guarantee durable sync across block boundaries the stream should
+   * be created with {@link CreateFlag#SYNC_BLOCK}.
+   */
+  @Override
+  public void hsync() throws IOException {
+    flushOrSync(true);
+  }
+
+  private void flushOrSync(boolean isSync) throws IOException {
     dfsClient.checkOpen();
     isClosed();
     try {
@@ -1467,7 +1499,13 @@ public class DFSOutputStream extends FSO
           assert bytesCurBlock > lastFlushOffset;
           // record the valid offset of this flush
           lastFlushOffset = bytesCurBlock;
-          waitAndQueueCurrentPacket();
+          if (isSync && currentPacket == null) {
+            // Nothing to send right now,
+            // but sync was requested.
+            // Send an empty packet
+            currentPacket = new Packet(packetSize, chunksPerPacket,
+                bytesCurBlock);
+          }
         } else {
           // We already flushed up to this offset.
           // This means that we haven't written anything since the last flush
@@ -1477,8 +1515,21 @@ public class DFSOutputStream extends FSO
           assert oldCurrentPacket == null :
             "Empty flush should not occur with a currentPacket";
 
-          // just discard the current packet since it is already been sent.
-          currentPacket = null;
+          if (isSync && bytesCurBlock > 0) {
+            // Nothing to send right now,
+            // and the block was partially written,
+            // and sync was requested.
+            // So send an empty sync packet.
+            currentPacket = new Packet(packetSize, chunksPerPacket,
+                bytesCurBlock);
+          } else {
+            // just discard the current packet since it is already been sent.
+            currentPacket = null;
+          }
+        }
+        if (currentPacket != null) {
+          currentPacket.syncBlock = isSync;
+          waitAndQueueCurrentPacket();          
         }
         // Restore state of stream. Record the last flush offset 
         // of the last full chunk that was flushed.
@@ -1530,18 +1581,6 @@ public class DFSOutputStream extends FSO
   }
 
   /**
-   * The expected semantics is all data have flushed out to all replicas 
-   * and all replicas have done posix fsync equivalent - ie the OS has 
-   * flushed it to the disk device (but the disk may have it in its cache).
-   * 
-   * Right now by default it is implemented as hflush
-   */
-  @Override
-  public synchronized void hsync() throws IOException {
-    hflush();
-  }
-
-  /**
    * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
    */
   @Deprecated
@@ -1665,6 +1704,7 @@ public class DFSOutputStream extends FSO
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
+        currentPacket.syncBlock = shouldSyncBlock;
       }
 
       flushInternal();             // flush all data to Datanodes

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Jun  6 00:17:38 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.HadoopIllegalAr
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -288,7 +289,7 @@ public class DFSUtil {
    * @return collection of nameservice Ids, or null if not specified
    */
   public static Collection<String> getNameServiceIds(Configuration conf) {
-    return conf.getTrimmedStringCollection(DFS_FEDERATION_NAMESERVICES);
+    return conf.getTrimmedStringCollection(DFS_NAMESERVICES);
   }
 
   /**
@@ -609,6 +610,14 @@ public class DFSUtil {
   public static Collection<URI> getNameServiceUris(Configuration conf,
       String... keys) {
     Set<URI> ret = new HashSet<URI>();
+    
+    // We're passed multiple possible configuration keys for any given NN or HA
+    // nameservice, and search the config in order of these keys. In order to
+    // make sure that a later config lookup (e.g. fs.defaultFS) doesn't add a
+    // URI for a config key for which we've already found a preferred entry, we
+    // keep track of non-preferred keys here.
+    Set<URI> nonPreferredUris = new HashSet<URI>();
+    
     for (String nsId : getNameServiceIds(conf)) {
       if (HAUtil.isHAEnabled(conf, nsId)) {
         // Add the logical URI of the nameservice.
@@ -619,24 +628,46 @@ public class DFSUtil {
         }
       } else {
         // Add the URI corresponding to the address of the NN.
+        boolean uriFound = false;
         for (String key : keys) {
           String addr = conf.get(concatSuffixes(key, nsId));
           if (addr != null) {
-            ret.add(createUri(HdfsConstants.HDFS_URI_SCHEME,
-                NetUtils.createSocketAddr(addr)));
-            break;
+            URI uri = createUri(HdfsConstants.HDFS_URI_SCHEME,
+                NetUtils.createSocketAddr(addr));
+            if (!uriFound) {
+              uriFound = true;
+              ret.add(uri);
+            } else {
+              nonPreferredUris.add(uri);
+            }
           }
         }
       }
     }
+    
     // Add the generic configuration keys.
+    boolean uriFound = false;
     for (String key : keys) {
       String addr = conf.get(key);
       if (addr != null) {
-        ret.add(createUri("hdfs", NetUtils.createSocketAddr(addr)));
-        break;
+        URI uri = createUri("hdfs", NetUtils.createSocketAddr(addr));
+        if (!uriFound) {
+          uriFound = true;
+          ret.add(uri);
+        } else {
+          nonPreferredUris.add(uri);
+        }
       }
     }
+    
+    // Add the default URI if it is an HDFS URI.
+    URI defaultUri = FileSystem.getDefaultUri(conf);
+    if (defaultUri != null &&
+        HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
+        !nonPreferredUris.contains(defaultUri)) {
+      ret.add(defaultUri);
+    }
+    
     return ret;
   }
 
@@ -676,9 +707,10 @@ public class DFSUtil {
    * @param httpsAddress -If true, and if security is enabled, returns server 
    *                      https address. If false, returns server http address.
    * @return server http or https address
+   * @throws IOException 
    */
-  public static String getInfoServer(
-      InetSocketAddress namenodeAddr, Configuration conf, boolean httpsAddress) {
+  public static String getInfoServer(InetSocketAddress namenodeAddr,
+      Configuration conf, boolean httpsAddress) throws IOException {
     boolean securityOn = UserGroupInformation.isSecurityEnabled();
     String httpAddressKey = (securityOn && httpsAddress) ? 
         DFS_NAMENODE_HTTPS_ADDRESS_KEY : DFS_NAMENODE_HTTP_ADDRESS_KEY;
@@ -695,8 +727,14 @@ public class DFSUtil {
     } else {
       suffixes = new String[2];
     }
-
-    return getSuffixedConf(conf, httpAddressKey, httpAddressDefault, suffixes);
+    String configuredInfoAddr = getSuffixedConf(conf, httpAddressKey,
+        httpAddressDefault, suffixes);
+    if (namenodeAddr != null) {
+      return substituteForWildcardAddress(configuredInfoAddr,
+          namenodeAddr.getHostName());
+    } else {
+      return configuredInfoAddr;
+    }
   }
   
 
@@ -721,7 +759,7 @@ public class DFSUtil {
       if (UserGroupInformation.isSecurityEnabled() &&
           defaultSockAddr.getAddress().isAnyLocalAddress()) {
         throw new IOException("Cannot use a wildcard address with security. " +
-                              "Must explicitly set bind address for Kerberos");
+            "Must explicitly set bind address for Kerberos");
       }
       return defaultHost + ":" + sockAddr.getPort();
     } else {
@@ -843,7 +881,7 @@ public class DFSUtil {
    * Get the nameservice Id by matching the {@code addressKey} with the
    * the address of the local node. 
    * 
-   * If {@link DFSConfigKeys#DFS_FEDERATION_NAMESERVICE_ID} is not specifically
+   * If {@link DFSConfigKeys#DFS_NAMESERVICE_ID} is not specifically
    * configured, and more than one nameservice Id is configured, this method 
    * determines the nameservice Id by matching the local node's address with the
    * configured addresses. When a match is found, it returns the nameservice Id
@@ -855,7 +893,7 @@ public class DFSUtil {
    * @throws HadoopIllegalArgumentException on error
    */
   private static String getNameServiceId(Configuration conf, String addressKey) {
-    String nameserviceId = conf.get(DFS_FEDERATION_NAMESERVICE_ID);
+    String nameserviceId = conf.get(DFS_NAMESERVICE_ID);
     if (nameserviceId != null) {
       return nameserviceId;
     }
@@ -927,7 +965,7 @@ public class DFSUtil {
     if (found > 1) { // Only one address must match the local address
       String msg = "Configuration has multiple addresses that match "
           + "local node's address. Please configure the system with "
-          + DFS_FEDERATION_NAMESERVICE_ID + " and "
+          + DFS_NAMESERVICE_ID + " and "
           + DFS_HA_NAMENODE_ID_KEY;
       throw new HadoopIllegalArgumentException(msg);
     }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Jun  6 00:17:38 2012
@@ -223,12 +223,19 @@ public class DistributedFileSystem exten
 
   @Override
   public HdfsDataOutputStream create(Path f, FsPermission permission,
-    boolean overwrite, int bufferSize, short replication, long blockSize,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    return create(f, permission,
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+        blockSize, progress);
+  }
+  
+  @Override
+  public HdfsDataOutputStream create(Path f, FsPermission permission,
+    EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
-    final EnumSet<CreateFlag> cflags = overwrite?
-        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-        : EnumSet.of(CreateFlag.CREATE);
     final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
         replication, blockSize, progress, bufferSize);
     return new HdfsDataOutputStream(out, statistics);
@@ -249,6 +256,7 @@ public class DistributedFileSystem exten
   /**
    * Same as create(), except fails if parent directory doesn't already exist.
    */
+  @Override
   public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission,
       EnumSet<CreateFlag> flag, int bufferSize, short replication,
       long blockSize, Progressable progress) throws IOException {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java Wed Jun  6 00:17:38 2012
@@ -142,7 +142,7 @@ public class HAUtil {
     Preconditions.checkArgument(nsId != null,
         "Could not determine namespace id. Please ensure that this " +
         "machine is one of the machines listed as a NN RPC address, " +
-        "or configure " + DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID);
+        "or configure " + DFSConfigKeys.DFS_NAMESERVICE_ID);
     
     Collection<String> nnIds = DFSUtil.getNameNodeIds(myConf, nsId);
     String myNNId = myConf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Wed Jun  6 00:17:38 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.ZKFCProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -47,6 +48,8 @@ public class HDFSPolicyProvider extends 
     new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
     new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
         HAServiceProtocol.class),
+    new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,
+        ZKFCProtocol.class),
     new Service(
         CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY, 
         RefreshAuthorizationPolicyProtocol.class),

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Wed Jun  6 00:17:38 2012
@@ -63,7 +63,7 @@ public class HdfsConfiguration extends C
   }
 
   private static void deprecate(String oldKey, String newKey) {
-    Configuration.addDeprecation(oldKey, new String[]{newKey});
+    Configuration.addDeprecation(oldKey, newKey);
   }
 
   private static void addDeprecatedKeys() {
@@ -102,5 +102,7 @@ public class HdfsConfiguration extends C
     deprecate("dfs.block.size", DFSConfigKeys.DFS_BLOCK_SIZE_KEY);
     deprecate("dfs.datanode.max.xcievers", DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY);
     deprecate("io.bytes.per.checksum", DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY);
+    deprecate("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES);
+    deprecate("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID);
   }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java Wed Jun  6 00:17:38 2012
@@ -214,6 +214,17 @@ public class Block implements Writable, 
     }
     return compareTo((Block)o) == 0;
   }
+  
+  /**
+   * @return true if the two blocks have the same block ID and the same
+   * generation stamp, or if both blocks are null.
+   */
+  public static boolean matchingIdAndGenStamp(Block a, Block b) {
+    if (a == b) return true; // same block, or both null
+    if (a == null || b == null) return false; // only one null
+    return a.blockId == b.blockId &&
+           a.generationStamp == b.generationStamp;
+  }
 
   @Override // Object
   public int hashCode() {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Jun  6 00:17:38 2012
@@ -309,6 +309,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public LocatedBlock addBlock(String src, String clientName,
       ExtendedBlock previous, DatanodeInfo[] excludeNodes)
       throws AccessControlException, FileNotFoundException,
@@ -362,6 +363,7 @@ public interface ClientProtocol {
    * @throws UnresolvedLinkException If <code>src</code> contains a symlink 
    * @throws IOException If an I/O error occurred
    */
+  @Idempotent
   public boolean complete(String src, String clientName, ExtendedBlock last)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException;

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Wed Jun  6 00:17:38 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.protocol;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 /**
  * This class represents the primary identifier for a Datanode.
@@ -45,23 +44,6 @@ public class DatanodeID implements Compa
   protected int infoPort;      // info server port
   protected int ipcPort;       // IPC server port
 
-  public DatanodeID(String ipAddr, int xferPort) {
-    this(ipAddr, "", "", xferPort,
-        DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
-        DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
-  }
-
-  public DatanodeID(String ipAddr, String hostName, int xferPort) {
-    this(ipAddr, hostName, "", xferPort,
-        DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
-        DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
-  }
-
-  /**
-   * DatanodeID copy constructor
-   * 
-   * @param from
-   */
   public DatanodeID(DatanodeID from) {
     this(from.getIpAddr(),
         from.getHostName(),
@@ -72,7 +54,7 @@ public class DatanodeID implements Compa
   }
   
   /**
-   * Create DatanodeID
+   * Create a DatanodeID
    * @param ipAddr IP
    * @param hostName hostname
    * @param storageID data storage ID
@@ -94,22 +76,6 @@ public class DatanodeID implements Compa
     this.ipAddr = ipAddr;
   }
 
-  public void setHostName(String hostName) {
-    this.hostName = hostName;
-  }
-
-  public void setXferPort(int xferPort) {
-    this.xferPort = xferPort;
-  }
-
-  public void setInfoPort(int infoPort) {
-    this.infoPort = infoPort;
-  }
-  
-  public void setIpcPort(int ipcPort) {
-    this.ipcPort = ipcPort;
-  }
-
   public void setStorageID(String storageID) {
     this.storageID = storageID;
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed Jun  6 00:17:38 2012
@@ -22,11 +22,11 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
-/****************************************************
- * A LocatedBlock is a pair of Block, DatanodeInfo[]
- * objects.  It tells where to find a Block.
- * 
- ****************************************************/
+/**
+ * Associates a block with the Datanodes that contain its replicas
+ * and other block metadata (E.g. the file offset associated with this
+ * block, whether it is corrupt, security token, etc).
+ */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class LocatedBlock {
@@ -40,19 +40,6 @@ public class LocatedBlock {
   private boolean corrupt;
   private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
 
-  public LocatedBlock() {
-    this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
-  }
-  
-
-  public LocatedBlock(ExtendedBlock eb) {
-    this(eb, new DatanodeInfo[0], 0L, false);
-  }
-
-  public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
-    this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown
-  }
-
   public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
     this(b, locs, -1, false); // startOffset is unknown
   }
@@ -81,14 +68,10 @@ public class LocatedBlock {
     this.blockToken = token;
   }
 
-  /**
-   */
   public ExtendedBlock getBlock() {
     return b;
   }
 
-  /**
-   */
   public DatanodeInfo[] getLocations() {
     return locs;
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Wed Jun  6 00:17:38 2012
@@ -105,8 +105,9 @@ public class LocatedBlocks {
    * @return block if found, or null otherwise.
    */
   public int findBlock(long offset) {
-    // create fake block of size 1 as a key
-    LocatedBlock key = new LocatedBlock();
+    // create fake block of size 0 as a key
+    LocatedBlock key = new LocatedBlock(
+        new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
     key.setStartOffset(offset);
     key.getBlock().setNumBytes(1);
     Comparator<LocatedBlock> comp = 

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Wed Jun  6 00:17:38 2012
@@ -40,6 +40,7 @@ public class PacketHeader {
       .setSeqno(0)
       .setLastPacketInBlock(false)
       .setDataLen(0)
+      .setSyncBlock(false)
       .build().getSerializedSize();
   public static final int PKT_HEADER_LEN =
     6 + PROTO_SIZE;
@@ -51,13 +52,14 @@ public class PacketHeader {
   }
 
   public PacketHeader(int packetLen, long offsetInBlock, long seqno,
-                      boolean lastPacketInBlock, int dataLen) {
+                      boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
     this.packetLen = packetLen;
     proto = PacketHeaderProto.newBuilder()
       .setOffsetInBlock(offsetInBlock)
       .setSeqno(seqno)
       .setLastPacketInBlock(lastPacketInBlock)
       .setDataLen(dataLen)
+      .setSyncBlock(syncBlock)
       .build();
   }
 
@@ -81,6 +83,10 @@ public class PacketHeader {
     return packetLen;
   }
 
+  public boolean getSyncBlock() {
+    return proto.getSyncBlock();
+  }
+
   @Override
   public String toString() {
     return "PacketHeader with packetLen=" + packetLen +

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java Wed Jun  6 00:17:38 2012
@@ -20,9 +20,13 @@ package org.apache.hadoop.hdfs.protocolP
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
 import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
 
+@KerberosInfo(
+    serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
 @ProtocolInfo(
     protocolName = "org.apache.hadoop.tools.GetUserMappingsProtocol", 
     protocolVersion = 1)

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java Wed Jun  6 00:17:38 2012
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
@@ -104,6 +106,20 @@ public class NamenodeProtocolServerSideT
     }
     return GetTransactionIdResponseProto.newBuilder().setTxId(txid).build();
   }
+  
+  @Override
+  public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
+      RpcController unused, GetMostRecentCheckpointTxIdRequestProto request)
+      throws ServiceException {
+    long txid;
+    try {
+      txid = impl.getMostRecentCheckpointTxId();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return GetMostRecentCheckpointTxIdResponseProto.newBuilder().setTxId(txid).build();
+  }
+
 
   @Override
   public RollEditLogResponseProto rollEditLog(RpcController unused,

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java Wed Jun  6 00:17:38 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
@@ -120,6 +121,16 @@ public class NamenodeProtocolTranslatorP
   }
 
   @Override
+  public long getMostRecentCheckpointTxId() throws IOException {
+    try {
+      return rpcProxy.getMostRecentCheckpointTxId(NULL_CONTROLLER,
+          GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
   public CheckpointSignature rollEditLog() throws IOException {
     try {
       return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER,

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Jun  6 00:17:38 2012
@@ -254,11 +254,11 @@ public class PBHelper {
   public static BlockWithLocationsProto convert(BlockWithLocations blk) {
     return BlockWithLocationsProto.newBuilder()
         .setBlock(convert(blk.getBlock()))
-        .addAllDatanodeIDs(Arrays.asList(blk.getDatanodes())).build();
+        .addAllStorageIDs(Arrays.asList(blk.getStorageIDs())).build();
   }
 
   public static BlockWithLocations convert(BlockWithLocationsProto b) {
-    return new BlockWithLocations(convert(b.getBlock()), b.getDatanodeIDsList()
+    return new BlockWithLocations(convert(b.getBlock()), b.getStorageIDsList()
         .toArray(new String[0]));
   }
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Jun  6 00:17:38 2012
@@ -205,6 +205,7 @@ public class Balancer {
   private Map<Block, BalancerBlock> globalBlockList
                  = new HashMap<Block, BalancerBlock>();
   private MovedBlocks movedBlocks = new MovedBlocks();
+  // Map storage IDs to BalancerDatanodes
   private Map<String, BalancerDatanode> datanodes
                  = new HashMap<String, BalancerDatanode>();
   
@@ -262,9 +263,9 @@ public class Balancer {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Decided to move block "+ block.getBlockId()
                     +" with a length of "+StringUtils.byteDesc(block.getNumBytes())
-                    + " bytes from " + source.getName() 
-                    + " to " + target.getName()
-                    + " using proxy source " + proxySource.getName() );
+                    + " bytes from " + source.getDisplayName()
+                    + " to " + target.getDisplayName()
+                    + " using proxy source " + proxySource.getDisplayName() );
               }
               return true;
             }
@@ -317,15 +318,15 @@ public class Balancer {
         receiveResponse(in);
         bytesMoved.inc(block.getNumBytes());
         LOG.info( "Moving block " + block.getBlock().getBlockId() +
-              " from "+ source.getName() + " to " +
-              target.getName() + " through " +
-              proxySource.getName() +
+              " from "+ source.getDisplayName() + " to " +
+              target.getDisplayName() + " through " +
+              proxySource.getDisplayName() +
               " is succeeded." );
       } catch (IOException e) {
         LOG.warn("Error moving block "+block.getBlockId()+
-            " from " + source.getName() + " to " +
-            target.getName() + " through " +
-            proxySource.getName() +
+            " from " + source.getDisplayName() + " to " +
+            target.getDisplayName() + " through " +
+            proxySource.getDisplayName() +
             ": "+e.getMessage());
       } finally {
         IOUtils.closeStream(out);
@@ -378,7 +379,8 @@ public class Balancer {
         public void run() {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Starting moving "+ block.getBlockId() +
-                " from " + proxySource.getName() + " to " + target.getName());
+                " from " + proxySource.getDisplayName() + " to " +
+                target.getDisplayName());
           }
           dispatch();
         }
@@ -475,7 +477,7 @@ public class Balancer {
     
     @Override
     public String toString() {
-      return getClass().getSimpleName() + "[" + getName()
+      return getClass().getSimpleName() + "[" + datanode
           + ", utilization=" + utilization + "]";
     }
 
@@ -507,8 +509,8 @@ public class Balancer {
     }
     
     /** Get the name of the datanode */
-    protected String getName() {
-      return datanode.getName();
+    protected String getDisplayName() {
+      return datanode.toString();
     }
     
     /* Get the storage id of the datanode */
@@ -620,8 +622,8 @@ public class Balancer {
         
           synchronized (block) {
             // update locations
-            for ( String location : blk.getDatanodes() ) {
-              BalancerDatanode datanode = datanodes.get(location);
+            for ( String storageID : blk.getStorageIDs() ) {
+              BalancerDatanode datanode = datanodes.get(storageID);
               if (datanode != null) { // not an unknown datanode
                 block.addLocation(datanode);
               }
@@ -831,7 +833,7 @@ public class Balancer {
           this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
         } else {
           assert(isOverUtilized(datanodeS)) :
-            datanodeS.getName()+ "is not an overUtilized node";
+            datanodeS.getDisplayName()+ "is not an overUtilized node";
           this.overUtilizedDatanodes.add((Source)datanodeS);
           overLoadedBytes += (long)((datanodeS.utilization-avg
               -threshold)*datanodeS.datanode.getCapacity()/100.0);
@@ -842,7 +844,7 @@ public class Balancer {
           this.belowAvgUtilizedDatanodes.add(datanodeS);
         } else {
           assert isUnderUtilized(datanodeS) : "isUnderUtilized("
-              + datanodeS.getName() + ")=" + isUnderUtilized(datanodeS)
+              + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS)
               + ", utilization=" + datanodeS.utilization; 
           this.underUtilizedDatanodes.add(datanodeS);
           underLoadedBytes += (long)((avg-threshold-

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Wed Jun  6 00:17:38 2012
@@ -200,7 +200,7 @@ class NameNodeConnector {
           Thread.sleep(keyUpdaterInterval);
         }
       } catch (InterruptedException e) {
-        LOG.info("InterruptedException in block key updater thread", e);
+        LOG.debug("InterruptedException in block key updater thread", e);
       } catch (Throwable e) {
         LOG.error("Exception in block key updater thread", e);
         shouldRun = false;

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Wed Jun  6 00:17:38 2012
@@ -19,9 +19,6 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.fs.ContentSummary;
 
 /** 
@@ -31,19 +28,24 @@ import org.apache.hadoop.fs.ContentSumma
 public interface BlockCollection {
   /**
    * Get the last block of the collection.
-   * Make sure it has the right type.
    */
-  public <T extends BlockInfo> T getLastBlock() throws IOException;
+  public BlockInfo getLastBlock() throws IOException;
 
   /** 
    * Get content summary.
    */
   public ContentSummary computeContentSummary();
 
-  /** @return the number of blocks */ 
+  /**
+   * @return the number of blocks
+   */ 
   public int numBlocks();
 
+  /**
+   * Get the blocks.
+   */
   public BlockInfo[] getBlocks();
+
   /**
    * Get preferred block size for the collection 
    * @return preferred block size in bytes
@@ -57,7 +59,7 @@ public interface BlockCollection {
   public short getReplication();
 
   /**
-   *  Get name of collection.
+   * Get the name of the collection.
    */
   public String getName();
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Jun  6 00:17:38 2012
@@ -437,7 +437,7 @@ public class BlockManager {
    * @throws IOException if the block does not have at least a minimal number
    * of replicas reported from data-nodes.
    */
-  private boolean commitBlock(final BlockInfoUnderConstruction block,
+  private static boolean commitBlock(final BlockInfoUnderConstruction block,
       final Block commitBlock) throws IOException {
     if (block.getBlockUCState() == BlockUCState.COMMITTED)
       return false;



Mime
View raw message