Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Wed Jun 6 00:17:38 2012 @@ -18,53 +18,31 @@ package org.apache.hadoop.contrib.bkjournal; import static org.junit.Assert.*; - -import java.net.URI; -import java.util.Collections; -import java.util.Arrays; -import java.util.List; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.bookkeeper.util.LocalBookKeeper; - -import java.io.RandomAccessFile; -import java.io.File; -import java.io.FilenameFilter; -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.security.SecurityUtil; +import static org.mockito.Mockito.spy; import org.junit.Test; import org.junit.Before; import org.junit.After; import org.junit.BeforeClass; import org.junit.AfterClass; +import org.mockito.Mockito; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; import org.apache.hadoop.hdfs.server.namenode.JournalManager; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.WatchedEvent; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; - -import com.google.common.collect.ImmutableList; - -import java.util.zip.CheckedInputStream; -import java.util.zip.Checksum; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,79 +51,26 @@ public class TestBookKeeperJournalManage static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class); private static final long DEFAULT_SEGMENT_SIZE = 1000; - private static final String zkEnsemble = "localhost:2181"; - private static Thread bkthread; protected static Configuration conf = new Configuration(); private ZooKeeper zkc; - - private static ZooKeeper connectZooKeeper(String ensemble) - throws IOException, KeeperException, InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - - ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() { - public void process(WatchedEvent event) { - if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { - latch.countDown(); - } - } - }); - if (!latch.await(3, TimeUnit.SECONDS)) { - throw new IOException("Zookeeper took too long to connect"); - } - return zkc; - } + private static BKJMUtil bkutil; + static int numBookies = 3; @BeforeClass public static void setupBookkeeper() throws Exception { - final int numBookies = 5; - bkthread = new Thread() { - public void run() { - try { - String[] args = new String[1]; - args[0] = String.valueOf(numBookies); - LOG.info("Starting bk"); - LocalBookKeeper.main(args); - } catch (InterruptedException e) { - // go away quietly - } catch (Exception e) { - LOG.error("Error starting local bk", e); - } - } - }; - bkthread.start(); - - if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) { - throw new Exception("Error starting zookeeper/bookkeeper"); - } + bkutil = new BKJMUtil(numBookies); + bkutil.start(); + } - ZooKeeper zkc = connectZooKeeper(zkEnsemble); - try { - boolean up = false; - for (int i = 0; i < 10; i++) { - try { - List children = zkc.getChildren("/ledgers/available", - false); - if (children.size() == numBookies) { - up = true; - break; - } - } catch (KeeperException e) { - // ignore - } - Thread.sleep(1000); - } - if (!up) { - throw new IOException("Not enough bookies started"); - } - } finally { - zkc.close(); - } + @AfterClass + public static void teardownBookkeeper() throws Exception { + bkutil.teardown(); } - + @Before public void setup() throws Exception { - zkc = connectZooKeeper(zkEnsemble); + zkc = BKJMUtil.connectZooKeeper(); } @After @@ -153,19 +78,10 @@ public class TestBookKeeperJournalManage zkc.close(); } - @AfterClass - public static void teardownBookkeeper() throws Exception { - if (bkthread != null) { - bkthread.interrupt(); - bkthread.join(); - } - } - @Test public void testSimpleWrite() throws Exception { BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite")); - long txid = 1; + BKJMUtil.createJournalURI("/hdfsjournal-simplewrite")); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -178,14 +94,13 @@ public class TestBookKeeperJournalManage String zkpath = bkjm.finalizedLedgerZNode(1, 100); assertNotNull(zkc.exists(zkpath, false)); - assertNull(zkc.exists(bkjm.inprogressZNode(), false)); + assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); } @Test public void testNumberOfTransactions() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount")); - long txid = 1; + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-txncount")); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -201,8 +116,8 @@ public class TestBookKeeperJournalManage @Test public void testNumberOfTransactionsWithGaps() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-gaps")); long txid = 1; for (long i = 0; i < 3; i++) { long start = txid; @@ -214,9 +129,11 @@ public class TestBookKeeperJournalManage } out.close(); bkjm.finalizeLogSegment(start, txid-1); - assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); + assertNotNull( + zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); } - zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1); + zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, + DEFAULT_SEGMENT_SIZE*2), -1); long numTrans = bkjm.getNumberOfTransactions(1, true); assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); @@ -234,8 +151,8 @@ public class TestBookKeeperJournalManage @Test public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd")); long txid = 1; for (long i = 0; i < 3; i++) { long start = txid; @@ -248,7 +165,8 @@ public class TestBookKeeperJournalManage out.close(); bkjm.finalizeLogSegment(start, (txid-1)); - assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false)); + assertNotNull( + zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false)); } long start = txid; EditLogOutputStream out = bkjm.startLogSegment(start); @@ -272,8 +190,8 @@ public class TestBookKeeperJournalManage */ @Test public void testWriteRestartFrom1() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1")); long txid = 1; long start = txid; EditLogOutputStream out = bkjm.startLogSegment(txid); @@ -327,25 +245,26 @@ public class TestBookKeeperJournalManage @Test public void testTwoWriters() throws Exception { long start = 1; - BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter")); - BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter")); + BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); + BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); EditLogOutputStream out1 = bkjm1.startLogSegment(start); try { - EditLogOutputStream out2 = bkjm2.startLogSegment(start); + bkjm2.startLogSegment(start); fail("Shouldn't have been able to open the second writer"); } catch (IOException ioe) { LOG.info("Caught exception as expected", ioe); + }finally{ + out1.close(); } } @Test public void testSimpleRead() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread")); - long txid = 1; + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-simpleread")); final long numTransactions = 10000; EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= numTransactions; i++) { @@ -368,10 +287,9 @@ public class TestBookKeeperJournalManage @Test public void testSimpleRecovery() throws Exception { - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery")); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery")); EditLogOutputStream out = bkjm.startLogSegment(1); - long txid = 1; for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); @@ -385,11 +303,372 @@ public class TestBookKeeperJournalManage assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); - assertNotNull(zkc.exists(bkjm.inprogressZNode(), false)); + assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false)); bkjm.recoverUnfinalizedSegments(); assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); - assertNull(zkc.exists(bkjm.inprogressZNode(), false)); + assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); + } + + /** + * Test that if enough bookies fail to prevent an ensemble, + * writes the bookkeeper will fail. Test that when once again + * an ensemble is available, it can continue to write. + */ + @Test + public void testAllBookieFailure() throws Exception { + BookieServer bookieToFail = bkutil.newBookie(); + BookieServer replacementBookie = null; + + try { + int ensembleSize = numBookies + 1; + assertEquals("New bookie didn't start", + ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); + + // ensure that the journal manager has to use all bookies, + // so that a failure will fail the journal manager + Configuration conf = new Configuration(); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, + ensembleSize); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, + ensembleSize); + long txid = 1; + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure")); + EditLogOutputStream out = bkjm.startLogSegment(txid); + + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + bookieToFail.shutdown(); + assertEquals("New bookie didn't die", + numBookies, bkutil.checkBookiesUp(numBookies, 10)); + + try { + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + fail("should not get to this stage"); + } catch (IOException ioe) { + LOG.debug("Error writing to bookkeeper", ioe); + assertTrue("Invalid exception message", + ioe.getMessage().contains("Failed to write to bookkeeper")); + } + replacementBookie = bkutil.newBookie(); + + assertEquals("New bookie didn't start", + numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10)); + bkjm.recoverUnfinalizedSegments(); + out = bkjm.startLogSegment(txid); + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + + out.setReadyToFlush(); + out.flush(); + + } catch (Exception e) { + LOG.error("Exception in test", e); + throw e; + } finally { + if (replacementBookie != null) { + replacementBookie.shutdown(); + } + bookieToFail.shutdown(); + + if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { + LOG.warn("Not all bookies from this test shut down, expect errors"); + } + } + } + + /** + * Test that a BookKeeper JM can continue to work across the + * failure of a bookie. This should be handled transparently + * by bookkeeper. + */ + @Test + public void testOneBookieFailure() throws Exception { + BookieServer bookieToFail = bkutil.newBookie(); + BookieServer replacementBookie = null; + + try { + int ensembleSize = numBookies + 1; + assertEquals("New bookie didn't start", + ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); + + // ensure that the journal manager has to use all bookies, + // so that a failure will fail the journal manager + Configuration conf = new Configuration(); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, + ensembleSize); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, + ensembleSize); + long txid = 1; + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure")); + EditLogOutputStream out = bkjm.startLogSegment(txid); + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + + replacementBookie = bkutil.newBookie(); + assertEquals("replacement bookie didn't start", + ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10)); + bookieToFail.shutdown(); + assertEquals("New bookie didn't die", + ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10)); + + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + } catch (Exception e) { + LOG.error("Exception in test", e); + throw e; + } finally { + if (replacementBookie != null) { + replacementBookie.shutdown(); + } + bookieToFail.shutdown(); + + if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) { + LOG.warn("Not all bookies from this test shut down, expect errors"); + } + } + } + + /** + * If a journal manager has an empty inprogress node, ensure that we throw an + * error, as this should not be possible, and some third party has corrupted + * the zookeeper state + */ + @Test + public void testEmptyInprogressNode() throws Exception { + URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, 100); + + out = bkjm.startLogSegment(101); + out.close(); + bkjm.close(); + String inprogressZNode = bkjm.inprogressZNode(101); + zkc.setData(inprogressZNode, new byte[0], -1); + + bkjm = new BookKeeperJournalManager(conf, uri); + try { + bkjm.recoverUnfinalizedSegments(); + fail("Should have failed. There should be no way of creating" + + " an empty inprogess znode"); + } catch (IOException e) { + // correct behaviour + assertTrue("Exception different than expected", e.getMessage().contains( + "Invalid ledger entry,")); + } finally { + bkjm.close(); + } + } + + /** + * If a journal manager has an corrupt inprogress node, ensure that we throw + * an error, as this should not be possible, and some third party has + * corrupted the zookeeper state + */ + @Test + public void testCorruptInprogressNode() throws Exception { + URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, 100); + + out = bkjm.startLogSegment(101); + out.close(); + bkjm.close(); + + String inprogressZNode = bkjm.inprogressZNode(101); + zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1); + + bkjm = new BookKeeperJournalManager(conf, uri); + try { + bkjm.recoverUnfinalizedSegments(); + fail("Should have failed. There should be no way of creating" + + " an empty inprogess znode"); + } catch (IOException e) { + // correct behaviour + assertTrue("Exception different than expected", e.getMessage().contains( + "Invalid ledger entry,")); + + } finally { + bkjm.close(); + } + } + + /** + * Cases can occur where we create a segment but crash before we even have the + * chance to write the START_SEGMENT op. If this occurs we should warn, but + * load as normal + */ + @Test + public void testEmptyInprogressLedger() throws Exception { + URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, 100); + + out = bkjm.startLogSegment(101); + out.close(); + bkjm.close(); + + bkjm = new BookKeeperJournalManager(conf, uri); + bkjm.recoverUnfinalizedSegments(); + out = bkjm.startLogSegment(101); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(101, 200); + + bkjm.close(); + } + + /** + * Test that if we fail between finalizing an inprogress and deleting the + * corresponding inprogress znode. + */ + @Test + public void testRefinalizeAlreadyFinalizedInprogress() throws Exception { + URI uri = BKJMUtil + .createJournalURI("/hdfsjournal-refinalizeInprogressLedger"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.close(); + + String inprogressZNode = bkjm.inprogressZNode(1); + String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100); + assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode, + null)); + assertNull("finalized znode exists", zkc.exists(finalizedZNode, null)); + + byte[] inprogressData = zkc.getData(inprogressZNode, false, null); + + // finalize + bkjm = new BookKeeperJournalManager(conf, uri); + bkjm.recoverUnfinalizedSegments(); + bkjm.close(); + + assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null)); + assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode, + null)); + + zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // should work fine + bkjm = new BookKeeperJournalManager(conf, uri); + bkjm.recoverUnfinalizedSegments(); + bkjm.close(); + } + + /** + * Tests that the edit log file meta data reading from ZooKeeper should be + * able to handle the NoNodeException. bkjm.getInputStream(fromTxId, + * inProgressOk) should suppress the NoNodeException and continue. HDFS-3441. + */ + @Test + public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception { + URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + try { + // start new inprogress log segment with txid=1 + // and write transactions till txid=50 + String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50); + + // start new inprogress log segment with txid=51 + // and write transactions till txid=100 + String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100); + + // read the metadata from ZK. Here simulating the situation + // when reading,the edit log metadata can be removed by purger thread. + ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper()); + bkjm.setZooKeeper(zkspy); + Mockito.doThrow( + new KeeperException.NoNodeException(zkpath2 + " doesn't exists")) + .when(zkspy).getData(zkpath2, false, null); + + List ledgerList = bkjm.getLedgerList(false); + assertEquals("List contains the metadata of non exists path.", 1, + ledgerList.size()); + assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1, + ledgerList.get(0).getZkPath()); + } finally { + bkjm.close(); + } + } + + private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm, + int startTxid, int endTxid) throws IOException, KeeperException, + InterruptedException { + EditLogOutputStream out = bkjm.startLogSegment(startTxid); + for (long i = startTxid; i <= endTxid; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + // finalize the inprogress_1 log segment. + bkjm.finalizeLogSegment(startTxid, endTxid); + String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid); + assertNotNull(zkc.exists(zkpath1, false)); + assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false)); + return zkpath1; } } Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java Wed Jun 6 00:17:38 2012 @@ -34,6 +34,11 @@ public class FSEditLogTestUtil { public static long countTransactionsInStream(EditLogInputStream in) throws IOException { FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); - return validation.getNumTransactions(); + return (validation.getEndTxId() - in.getFirstTxId()) + 1; + } + + public static void setRuntimeForEditLog(NameNode nn, Runtime rt) { + nn.setRuntimeForTesting(rt); + nn.getFSImage().getEditLog().setRuntimeForTesting(rt); } } Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml Wed Jun 6 00:17:38 2012 @@ -14,7 +14,10 @@ --> - + 4.0.0 org.apache.hadoop Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c Wed Jun 6 00:17:38 2012 @@ -37,7 +37,7 @@ int dfs_truncate(const char *path, off_t assert(dfs); if (size != 0) { - return -ENOTSUP; + return 0; } int ret = dfs_unlink(path); Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Wed Jun 6 00:17:38 2012 @@ -30,6 +30,7 @@ function print_usage(){ echo " namenode -format format the DFS filesystem" echo " secondarynamenode run the DFS secondary namenode" echo " namenode run the DFS namenode" + echo " zkfc run the ZK Failover Controller daemon" echo " datanode run a DFS datanode" echo " dfsadmin run a DFS admin client" echo " haadmin run a DFS HA admin client" @@ -56,21 +57,29 @@ shift # Determine if we're starting a secure datanode, and if so, redefine appropriate variables if [ "$COMMAND" == "datanode" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then - if [ -n "$HADOOP_SECURE_DN_PID_DIR" ]; then - HADOOP_PID_DIR=$HADOOP_SECURE_DN_PID_DIR - fi - - if [ -n "$HADOOP_SECURE_DN_LOG_DIR" ]; then - HADOOP_LOG_DIR=$HADOOP_SECURE_DN_LOG_DIR + if [ -n "$JSVC_HOME" ]; then + if [ -n "$HADOOP_SECURE_DN_PID_DIR" ]; then + HADOOP_PID_DIR=$HADOOP_SECURE_DN_PID_DIR + fi + + if [ -n "$HADOOP_SECURE_DN_LOG_DIR" ]; then + HADOOP_LOG_DIR=$HADOOP_SECURE_DN_LOG_DIR + fi + + HADOOP_IDENT_STRING=$HADOOP_SECURE_DN_USER + starting_secure_dn="true" + else + echo "It looks like you're trying to start a secure DN, but \$JSVC_HOME"\ + "isn't set. Falling back to starting insecure DN." fi - - HADOOP_IDENT_STRING=$HADOOP_SECURE_DN_USER - starting_secure_dn="true" fi if [ "$COMMAND" = "namenode" ] ; then CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode' HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS" +elif [ "$COMMAND" = "zkfc" ] ; then + CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController' + HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS" elif [ "$COMMAND" = "secondarynamenode" ] ; then CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode' HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS" @@ -125,12 +134,12 @@ if [ "$starting_secure_dn" = "true" ]; t if [ "$HADOOP_PID_DIR" = "" ]; then HADOOP_SECURE_DN_PID="/tmp/hadoop_secure_dn.pid" else - HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid" + HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid" fi JSVC=$JSVC_HOME/jsvc if [ ! -f $JSVC ]; then - echo "JSVC_HOME is not set correctly so jsvc can not be found. Jsvc is required to run secure datanodes. " + echo "JSVC_HOME is not set correctly so jsvc cannot be found. Jsvc is required to run secure datanodes. " echo "Please download and install jsvc from http://archive.apache.org/dist/commons/daemon/binaries/ "\ "and set JSVC_HOME to the directory containing the jsvc binary." exit Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh Wed Jun 6 00:17:38 2012 @@ -85,4 +85,15 @@ if [ -n "$SECONDARY_NAMENODES" ]; then --script "$bin/hdfs" start secondarynamenode fi +#--------------------------------------------------------- +# ZK Failover controllers, if auto-HA is enabled +AUTOHA_ENABLED=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.ha.automatic-failover.enabled) +if [ "$(echo "$AUTOHA_ENABLED" | tr A-Z a-z)" = "true" ]; then + echo "Starting ZK Failover Controllers on NN hosts [$NAMENODES]" + "$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \ + --config "$HADOOP_CONF_DIR" \ + --hostnames "$NAMENODES" \ + --script "$bin/hdfs" start zkfc +fi + # eof Propchange: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1306184-1342109 Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1337003-1346681 Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Jun 6 00:17:38 2012 @@ -107,6 +107,8 @@ public class DFSConfigKeys extends Commo public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000; public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval"; public static final int DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000; + public static final String DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier"; + public static final int DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT = 4; public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource"; public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml"; public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth"; @@ -334,8 +336,8 @@ public class DFSConfigKeys extends Commo public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold"; public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10; - public static final String DFS_FEDERATION_NAMESERVICES = "dfs.federation.nameservices"; - public static final String DFS_FEDERATION_NAMESERVICE_ID = "dfs.federation.nameservice.id"; + public static final String DFS_NAMESERVICES = "dfs.nameservices"; + public static final String DFS_NAMESERVICE_ID = "dfs.nameservice.id"; public static final String DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval"; public static final int DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000; public static final String DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved"; @@ -358,4 +360,8 @@ public class DFSConfigKeys extends Commo public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period"; public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods"; + public static final String DFS_HA_AUTO_FAILOVER_ENABLED_KEY = "dfs.ha.automatic-failover.enabled"; + public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false; + public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port"; + public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019; } Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Jun 6 00:17:38 2012 @@ -129,11 +129,13 @@ public class DFSOutputStream extends FSO private long initialFileSize = 0; // at time of file open private Progressable progress; private final short blockReplication; // replication factor of file + private boolean shouldSyncBlock = false; // force blocks to disk upon close private class Packet { long seqno; // sequencenumber of buffer in block long offsetInBlock; // offset in block - boolean lastPacketInBlock; // is this the last packet in block? + private boolean lastPacketInBlock; // is this the last packet in block? + boolean syncBlock; // this packet forces the current block to disk int numChunks; // number of chunks currently in packet int maxChunks; // max chunks in packet @@ -245,7 +247,7 @@ public class DFSOutputStream extends FSO buffer.mark(); PacketHeader header = new PacketHeader( - pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen); + pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); header.putInBuffer(buffer); buffer.reset(); @@ -507,8 +509,15 @@ public class DFSOutputStream extends FSO } // write out data to remote datanode - blockStream.write(buf.array(), buf.position(), buf.remaining()); - blockStream.flush(); + try { + blockStream.write(buf.array(), buf.position(), buf.remaining()); + blockStream.flush(); + } catch (IOException e) { + // HDFS-3398 treat primary DN is down since client is unable to + // write to primary DN + errorIndex = 0; + throw e; + } lastPacket = System.currentTimeMillis(); if (one.isHeartbeatPacket()) { //heartbeat packet @@ -965,6 +974,7 @@ public class DFSOutputStream extends FSO DatanodeInfo[] nodes = null; int count = dfsClient.getConf().nBlockWriteRetry; boolean success = false; + ExtendedBlock oldBlock = block; do { hasError = false; lastException = null; @@ -972,9 +982,11 @@ public class DFSOutputStream extends FSO success = false; long startTime = System.currentTimeMillis(); - DatanodeInfo[] w = excludedNodes.toArray( + DatanodeInfo[] excluded = excludedNodes.toArray( new DatanodeInfo[excludedNodes.size()]); - lb = locateFollowingBlock(startTime, w.length > 0 ? w : null); + block = oldBlock; + lb = locateFollowingBlock(startTime, + excluded.length > 0 ? excluded : null); block = lb.getBlock(); block.setNumBytes(0); accessToken = lb.getBlockToken(); @@ -1239,6 +1251,7 @@ public class DFSOutputStream extends FSO long blockSize, Progressable progress, int buffersize, DataChecksum checksum) throws IOException { this(dfsClient, src, blockSize, progress, checksum, replication); + this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum()); @@ -1421,6 +1434,7 @@ public class DFSOutputStream extends FSO currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, bytesCurBlock); currentPacket.lastPacketInBlock = true; + currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); bytesCurBlock = 0; lastFlushOffset = 0; @@ -1440,6 +1454,24 @@ public class DFSOutputStream extends FSO */ @Override public void hflush() throws IOException { + flushOrSync(false); + } + + /** + * The expected semantics is all data have flushed out to all replicas + * and all replicas have done posix fsync equivalent - ie the OS has + * flushed it to the disk device (but the disk may have it in its cache). + * + * Note that only the current block is flushed to the disk device. + * To guarantee durable sync across block boundaries the stream should + * be created with {@link CreateFlag#SYNC_BLOCK}. + */ + @Override + public void hsync() throws IOException { + flushOrSync(true); + } + + private void flushOrSync(boolean isSync) throws IOException { dfsClient.checkOpen(); isClosed(); try { @@ -1467,7 +1499,13 @@ public class DFSOutputStream extends FSO assert bytesCurBlock > lastFlushOffset; // record the valid offset of this flush lastFlushOffset = bytesCurBlock; - waitAndQueueCurrentPacket(); + if (isSync && currentPacket == null) { + // Nothing to send right now, + // but sync was requested. + // Send an empty packet + currentPacket = new Packet(packetSize, chunksPerPacket, + bytesCurBlock); + } } else { // We already flushed up to this offset. // This means that we haven't written anything since the last flush @@ -1477,8 +1515,21 @@ public class DFSOutputStream extends FSO assert oldCurrentPacket == null : "Empty flush should not occur with a currentPacket"; - // just discard the current packet since it is already been sent. - currentPacket = null; + if (isSync && bytesCurBlock > 0) { + // Nothing to send right now, + // and the block was partially written, + // and sync was requested. + // So send an empty sync packet. + currentPacket = new Packet(packetSize, chunksPerPacket, + bytesCurBlock); + } else { + // just discard the current packet since it is already been sent. + currentPacket = null; + } + } + if (currentPacket != null) { + currentPacket.syncBlock = isSync; + waitAndQueueCurrentPacket(); } // Restore state of stream. Record the last flush offset // of the last full chunk that was flushed. @@ -1530,18 +1581,6 @@ public class DFSOutputStream extends FSO } /** - * The expected semantics is all data have flushed out to all replicas - * and all replicas have done posix fsync equivalent - ie the OS has - * flushed it to the disk device (but the disk may have it in its cache). - * - * Right now by default it is implemented as hflush - */ - @Override - public synchronized void hsync() throws IOException { - hflush(); - } - - /** * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}. */ @Deprecated @@ -1665,6 +1704,7 @@ public class DFSOutputStream extends FSO currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, bytesCurBlock); currentPacket.lastPacketInBlock = true; + currentPacket.syncBlock = shouldSyncBlock; } flushInternal(); // flush all data to Datanodes Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Jun 6 00:17:38 2012 @@ -42,6 +42,7 @@ import org.apache.hadoop.HadoopIllegalAr import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -288,7 +289,7 @@ public class DFSUtil { * @return collection of nameservice Ids, or null if not specified */ public static Collection getNameServiceIds(Configuration conf) { - return conf.getTrimmedStringCollection(DFS_FEDERATION_NAMESERVICES); + return conf.getTrimmedStringCollection(DFS_NAMESERVICES); } /** @@ -609,6 +610,14 @@ public class DFSUtil { public static Collection getNameServiceUris(Configuration conf, String... keys) { Set ret = new HashSet(); + + // We're passed multiple possible configuration keys for any given NN or HA + // nameservice, and search the config in order of these keys. In order to + // make sure that a later config lookup (e.g. fs.defaultFS) doesn't add a + // URI for a config key for which we've already found a preferred entry, we + // keep track of non-preferred keys here. + Set nonPreferredUris = new HashSet(); + for (String nsId : getNameServiceIds(conf)) { if (HAUtil.isHAEnabled(conf, nsId)) { // Add the logical URI of the nameservice. @@ -619,24 +628,46 @@ public class DFSUtil { } } else { // Add the URI corresponding to the address of the NN. + boolean uriFound = false; for (String key : keys) { String addr = conf.get(concatSuffixes(key, nsId)); if (addr != null) { - ret.add(createUri(HdfsConstants.HDFS_URI_SCHEME, - NetUtils.createSocketAddr(addr))); - break; + URI uri = createUri(HdfsConstants.HDFS_URI_SCHEME, + NetUtils.createSocketAddr(addr)); + if (!uriFound) { + uriFound = true; + ret.add(uri); + } else { + nonPreferredUris.add(uri); + } } } } } + // Add the generic configuration keys. + boolean uriFound = false; for (String key : keys) { String addr = conf.get(key); if (addr != null) { - ret.add(createUri("hdfs", NetUtils.createSocketAddr(addr))); - break; + URI uri = createUri("hdfs", NetUtils.createSocketAddr(addr)); + if (!uriFound) { + uriFound = true; + ret.add(uri); + } else { + nonPreferredUris.add(uri); + } } } + + // Add the default URI if it is an HDFS URI. + URI defaultUri = FileSystem.getDefaultUri(conf); + if (defaultUri != null && + HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) && + !nonPreferredUris.contains(defaultUri)) { + ret.add(defaultUri); + } + return ret; } @@ -676,9 +707,10 @@ public class DFSUtil { * @param httpsAddress -If true, and if security is enabled, returns server * https address. If false, returns server http address. * @return server http or https address + * @throws IOException */ - public static String getInfoServer( - InetSocketAddress namenodeAddr, Configuration conf, boolean httpsAddress) { + public static String getInfoServer(InetSocketAddress namenodeAddr, + Configuration conf, boolean httpsAddress) throws IOException { boolean securityOn = UserGroupInformation.isSecurityEnabled(); String httpAddressKey = (securityOn && httpsAddress) ? DFS_NAMENODE_HTTPS_ADDRESS_KEY : DFS_NAMENODE_HTTP_ADDRESS_KEY; @@ -695,8 +727,14 @@ public class DFSUtil { } else { suffixes = new String[2]; } - - return getSuffixedConf(conf, httpAddressKey, httpAddressDefault, suffixes); + String configuredInfoAddr = getSuffixedConf(conf, httpAddressKey, + httpAddressDefault, suffixes); + if (namenodeAddr != null) { + return substituteForWildcardAddress(configuredInfoAddr, + namenodeAddr.getHostName()); + } else { + return configuredInfoAddr; + } } @@ -721,7 +759,7 @@ public class DFSUtil { if (UserGroupInformation.isSecurityEnabled() && defaultSockAddr.getAddress().isAnyLocalAddress()) { throw new IOException("Cannot use a wildcard address with security. " + - "Must explicitly set bind address for Kerberos"); + "Must explicitly set bind address for Kerberos"); } return defaultHost + ":" + sockAddr.getPort(); } else { @@ -843,7 +881,7 @@ public class DFSUtil { * Get the nameservice Id by matching the {@code addressKey} with the * the address of the local node. * - * If {@link DFSConfigKeys#DFS_FEDERATION_NAMESERVICE_ID} is not specifically + * If {@link DFSConfigKeys#DFS_NAMESERVICE_ID} is not specifically * configured, and more than one nameservice Id is configured, this method * determines the nameservice Id by matching the local node's address with the * configured addresses. When a match is found, it returns the nameservice Id @@ -855,7 +893,7 @@ public class DFSUtil { * @throws HadoopIllegalArgumentException on error */ private static String getNameServiceId(Configuration conf, String addressKey) { - String nameserviceId = conf.get(DFS_FEDERATION_NAMESERVICE_ID); + String nameserviceId = conf.get(DFS_NAMESERVICE_ID); if (nameserviceId != null) { return nameserviceId; } @@ -927,7 +965,7 @@ public class DFSUtil { if (found > 1) { // Only one address must match the local address String msg = "Configuration has multiple addresses that match " + "local node's address. Please configure the system with " - + DFS_FEDERATION_NAMESERVICE_ID + " and " + + DFS_NAMESERVICE_ID + " and " + DFS_HA_NAMENODE_ID_KEY; throw new HadoopIllegalArgumentException(msg); } Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Jun 6 00:17:38 2012 @@ -223,12 +223,19 @@ public class DistributedFileSystem exten @Override public HdfsDataOutputStream create(Path f, FsPermission permission, - boolean overwrite, int bufferSize, short replication, long blockSize, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return create(f, permission, + overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) + : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, + blockSize, progress); + } + + @Override + public HdfsDataOutputStream create(Path f, FsPermission permission, + EnumSet cflags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { statistics.incrementWriteOps(1); - final EnumSet cflags = overwrite? - EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) - : EnumSet.of(CreateFlag.CREATE); final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags, replication, blockSize, progress, bufferSize); return new HdfsDataOutputStream(out, statistics); @@ -249,6 +256,7 @@ public class DistributedFileSystem exten /** * Same as create(), except fails if parent directory doesn't already exist. */ + @Override public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission, EnumSet flag, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java Wed Jun 6 00:17:38 2012 @@ -142,7 +142,7 @@ public class HAUtil { Preconditions.checkArgument(nsId != null, "Could not determine namespace id. Please ensure that this " + "machine is one of the machines listed as a NN RPC address, " + - "or configure " + DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID); + "or configure " + DFSConfigKeys.DFS_NAMESERVICE_ID); Collection nnIds = DFSUtil.getNameNodeIds(myConf, nsId); String myNNId = myConf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY); Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Wed Jun 6 00:17:38 2012 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.ZKFCProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -47,6 +48,8 @@ public class HDFSPolicyProvider extends new Service("security.namenode.protocol.acl", NamenodeProtocol.class), new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL, HAServiceProtocol.class), + new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL, + ZKFCProtocol.class), new Service( CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY, RefreshAuthorizationPolicyProtocol.class), Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Wed Jun 6 00:17:38 2012 @@ -63,7 +63,7 @@ public class HdfsConfiguration extends C } private static void deprecate(String oldKey, String newKey) { - Configuration.addDeprecation(oldKey, new String[]{newKey}); + Configuration.addDeprecation(oldKey, newKey); } private static void addDeprecatedKeys() { @@ -102,5 +102,7 @@ public class HdfsConfiguration extends C deprecate("dfs.block.size", DFSConfigKeys.DFS_BLOCK_SIZE_KEY); deprecate("dfs.datanode.max.xcievers", DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY); deprecate("io.bytes.per.checksum", DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY); + deprecate("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES); + deprecate("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID); } } Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java Wed Jun 6 00:17:38 2012 @@ -214,6 +214,17 @@ public class Block implements Writable, } return compareTo((Block)o) == 0; } + + /** + * @return true if the two blocks have the same block ID and the same + * generation stamp, or if both blocks are null. + */ + public static boolean matchingIdAndGenStamp(Block a, Block b) { + if (a == b) return true; // same block, or both null + if (a == null || b == null) return false; // only one null + return a.blockId == b.blockId && + a.generationStamp == b.generationStamp; + } @Override // Object public int hashCode() { Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Jun 6 00:17:38 2012 @@ -309,6 +309,7 @@ public interface ClientProtocol { * @throws UnresolvedLinkException If src contains a symlink * @throws IOException If an I/O error occurred */ + @Idempotent public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes) throws AccessControlException, FileNotFoundException, @@ -362,6 +363,7 @@ public interface ClientProtocol { * @throws UnresolvedLinkException If src contains a symlink * @throws IOException If an I/O error occurred */ + @Idempotent public boolean complete(String src, String clientName, ExtendedBlock last) throws AccessControlException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException; Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Wed Jun 6 00:17:38 2012 @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.DFSConfigKeys; /** * This class represents the primary identifier for a Datanode. @@ -45,23 +44,6 @@ public class DatanodeID implements Compa protected int infoPort; // info server port protected int ipcPort; // IPC server port - public DatanodeID(String ipAddr, int xferPort) { - this(ipAddr, "", "", xferPort, - DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, - DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT); - } - - public DatanodeID(String ipAddr, String hostName, int xferPort) { - this(ipAddr, hostName, "", xferPort, - DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, - DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT); - } - - /** - * DatanodeID copy constructor - * - * @param from - */ public DatanodeID(DatanodeID from) { this(from.getIpAddr(), from.getHostName(), @@ -72,7 +54,7 @@ public class DatanodeID implements Compa } /** - * Create DatanodeID + * Create a DatanodeID * @param ipAddr IP * @param hostName hostname * @param storageID data storage ID @@ -94,22 +76,6 @@ public class DatanodeID implements Compa this.ipAddr = ipAddr; } - public void setHostName(String hostName) { - this.hostName = hostName; - } - - public void setXferPort(int xferPort) { - this.xferPort = xferPort; - } - - public void setInfoPort(int infoPort) { - this.infoPort = infoPort; - } - - public void setIpcPort(int ipcPort) { - this.ipcPort = ipcPort; - } - public void setStorageID(String storageID) { this.storageID = storageID; } Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed Jun 6 00:17:38 2012 @@ -22,11 +22,11 @@ import org.apache.hadoop.classification. import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; -/**************************************************** - * A LocatedBlock is a pair of Block, DatanodeInfo[] - * objects. It tells where to find a Block. - * - ****************************************************/ +/** + * Associates a block with the Datanodes that contain its replicas + * and other block metadata (E.g. the file offset associated with this + * block, whether it is corrupt, security token, etc). + */ @InterfaceAudience.Private @InterfaceStability.Evolving public class LocatedBlock { @@ -40,19 +40,6 @@ public class LocatedBlock { private boolean corrupt; private Token blockToken = new Token(); - public LocatedBlock() { - this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false); - } - - - public LocatedBlock(ExtendedBlock eb) { - this(eb, new DatanodeInfo[0], 0L, false); - } - - public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) { - this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown - } - public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) { this(b, locs, -1, false); // startOffset is unknown } @@ -81,14 +68,10 @@ public class LocatedBlock { this.blockToken = token; } - /** - */ public ExtendedBlock getBlock() { return b; } - /** - */ public DatanodeInfo[] getLocations() { return locs; } Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Wed Jun 6 00:17:38 2012 @@ -105,8 +105,9 @@ public class LocatedBlocks { * @return block if found, or null otherwise. */ public int findBlock(long offset) { - // create fake block of size 1 as a key - LocatedBlock key = new LocatedBlock(); + // create fake block of size 0 as a key + LocatedBlock key = new LocatedBlock( + new ExtendedBlock(), new DatanodeInfo[0], 0L, false); key.setStartOffset(offset); key.getBlock().setNumBytes(1); Comparator comp = Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Wed Jun 6 00:17:38 2012 @@ -40,6 +40,7 @@ public class PacketHeader { .setSeqno(0) .setLastPacketInBlock(false) .setDataLen(0) + .setSyncBlock(false) .build().getSerializedSize(); public static final int PKT_HEADER_LEN = 6 + PROTO_SIZE; @@ -51,13 +52,14 @@ public class PacketHeader { } public PacketHeader(int packetLen, long offsetInBlock, long seqno, - boolean lastPacketInBlock, int dataLen) { + boolean lastPacketInBlock, int dataLen, boolean syncBlock) { this.packetLen = packetLen; proto = PacketHeaderProto.newBuilder() .setOffsetInBlock(offsetInBlock) .setSeqno(seqno) .setLastPacketInBlock(lastPacketInBlock) .setDataLen(dataLen) + .setSyncBlock(syncBlock) .build(); } @@ -81,6 +83,10 @@ public class PacketHeader { return packetLen; } + public boolean getSyncBlock() { + return proto.getSyncBlock(); + } + @Override public String toString() { return "PacketHeader with packetLen=" + packetLen + Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java Wed Jun 6 00:17:38 2012 @@ -20,9 +20,13 @@ package org.apache.hadoop.hdfs.protocolP import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService; import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.security.KerberosInfo; +@KerberosInfo( + serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY) @ProtocolInfo( protocolName = "org.apache.hadoop.tools.GetUserMappingsProtocol", protocolVersion = 1) Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java Wed Jun 6 00:17:38 2012 @@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto; @@ -104,6 +106,20 @@ public class NamenodeProtocolServerSideT } return GetTransactionIdResponseProto.newBuilder().setTxId(txid).build(); } + + @Override + public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId( + RpcController unused, GetMostRecentCheckpointTxIdRequestProto request) + throws ServiceException { + long txid; + try { + txid = impl.getMostRecentCheckpointTxId(); + } catch (IOException e) { + throw new ServiceException(e); + } + return GetMostRecentCheckpointTxIdResponseProto.newBuilder().setTxId(txid).build(); + } + @Override public RollEditLogResponseProto rollEditLog(RpcController unused, Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java Wed Jun 6 00:17:38 2012 @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto; @@ -120,6 +121,16 @@ public class NamenodeProtocolTranslatorP } @Override + public long getMostRecentCheckpointTxId() throws IOException { + try { + return rpcProxy.getMostRecentCheckpointTxId(NULL_CONTROLLER, + GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override public CheckpointSignature rollEditLog() throws IOException { try { return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER, Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Jun 6 00:17:38 2012 @@ -254,11 +254,11 @@ public class PBHelper { public static BlockWithLocationsProto convert(BlockWithLocations blk) { return BlockWithLocationsProto.newBuilder() .setBlock(convert(blk.getBlock())) - .addAllDatanodeIDs(Arrays.asList(blk.getDatanodes())).build(); + .addAllStorageIDs(Arrays.asList(blk.getStorageIDs())).build(); } public static BlockWithLocations convert(BlockWithLocationsProto b) { - return new BlockWithLocations(convert(b.getBlock()), b.getDatanodeIDsList() + return new BlockWithLocations(convert(b.getBlock()), b.getStorageIDsList() .toArray(new String[0])); } Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Jun 6 00:17:38 2012 @@ -205,6 +205,7 @@ public class Balancer { private Map globalBlockList = new HashMap(); private MovedBlocks movedBlocks = new MovedBlocks(); + // Map storage IDs to BalancerDatanodes private Map datanodes = new HashMap(); @@ -262,9 +263,9 @@ public class Balancer { if (LOG.isDebugEnabled()) { LOG.debug("Decided to move block "+ block.getBlockId() +" with a length of "+StringUtils.byteDesc(block.getNumBytes()) - + " bytes from " + source.getName() - + " to " + target.getName() - + " using proxy source " + proxySource.getName() ); + + " bytes from " + source.getDisplayName() + + " to " + target.getDisplayName() + + " using proxy source " + proxySource.getDisplayName() ); } return true; } @@ -317,15 +318,15 @@ public class Balancer { receiveResponse(in); bytesMoved.inc(block.getNumBytes()); LOG.info( "Moving block " + block.getBlock().getBlockId() + - " from "+ source.getName() + " to " + - target.getName() + " through " + - proxySource.getName() + + " from "+ source.getDisplayName() + " to " + + target.getDisplayName() + " through " + + proxySource.getDisplayName() + " is succeeded." ); } catch (IOException e) { LOG.warn("Error moving block "+block.getBlockId()+ - " from " + source.getName() + " to " + - target.getName() + " through " + - proxySource.getName() + + " from " + source.getDisplayName() + " to " + + target.getDisplayName() + " through " + + proxySource.getDisplayName() + ": "+e.getMessage()); } finally { IOUtils.closeStream(out); @@ -378,7 +379,8 @@ public class Balancer { public void run() { if (LOG.isDebugEnabled()) { LOG.debug("Starting moving "+ block.getBlockId() + - " from " + proxySource.getName() + " to " + target.getName()); + " from " + proxySource.getDisplayName() + " to " + + target.getDisplayName()); } dispatch(); } @@ -475,7 +477,7 @@ public class Balancer { @Override public String toString() { - return getClass().getSimpleName() + "[" + getName() + return getClass().getSimpleName() + "[" + datanode + ", utilization=" + utilization + "]"; } @@ -507,8 +509,8 @@ public class Balancer { } /** Get the name of the datanode */ - protected String getName() { - return datanode.getName(); + protected String getDisplayName() { + return datanode.toString(); } /* Get the storage id of the datanode */ @@ -620,8 +622,8 @@ public class Balancer { synchronized (block) { // update locations - for ( String location : blk.getDatanodes() ) { - BalancerDatanode datanode = datanodes.get(location); + for ( String storageID : blk.getStorageIDs() ) { + BalancerDatanode datanode = datanodes.get(storageID); if (datanode != null) { // not an unknown datanode block.addLocation(datanode); } @@ -831,7 +833,7 @@ public class Balancer { this.aboveAvgUtilizedDatanodes.add((Source)datanodeS); } else { assert(isOverUtilized(datanodeS)) : - datanodeS.getName()+ "is not an overUtilized node"; + datanodeS.getDisplayName()+ "is not an overUtilized node"; this.overUtilizedDatanodes.add((Source)datanodeS); overLoadedBytes += (long)((datanodeS.utilization-avg -threshold)*datanodeS.datanode.getCapacity()/100.0); @@ -842,7 +844,7 @@ public class Balancer { this.belowAvgUtilizedDatanodes.add(datanodeS); } else { assert isUnderUtilized(datanodeS) : "isUnderUtilized(" - + datanodeS.getName() + ")=" + isUnderUtilized(datanodeS) + + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS) + ", utilization=" + datanodeS.utilization; this.underUtilizedDatanodes.add(datanodeS); underLoadedBytes += (long)((avg-threshold- Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Wed Jun 6 00:17:38 2012 @@ -200,7 +200,7 @@ class NameNodeConnector { Thread.sleep(keyUpdaterInterval); } } catch (InterruptedException e) { - LOG.info("InterruptedException in block key updater thread", e); + LOG.debug("InterruptedException in block key updater thread", e); } catch (Throwable e) { LOG.error("Exception in block key updater thread", e); shouldRun = false; Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Wed Jun 6 00:17:38 2012 @@ -19,9 +19,6 @@ package org.apache.hadoop.hdfs.server.bl import java.io.IOException; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.fs.ContentSummary; /** @@ -31,19 +28,24 @@ import org.apache.hadoop.fs.ContentSumma public interface BlockCollection { /** * Get the last block of the collection. - * Make sure it has the right type. */ - public T getLastBlock() throws IOException; + public BlockInfo getLastBlock() throws IOException; /** * Get content summary. */ public ContentSummary computeContentSummary(); - /** @return the number of blocks */ + /** + * @return the number of blocks + */ public int numBlocks(); + /** + * Get the blocks. + */ public BlockInfo[] getBlocks(); + /** * Get preferred block size for the collection * @return preferred block size in bytes @@ -57,7 +59,7 @@ public interface BlockCollection { public short getReplication(); /** - * Get name of collection. + * Get the name of the collection. */ public String getName(); } Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Jun 6 00:17:38 2012 @@ -437,7 +437,7 @@ public class BlockManager { * @throws IOException if the block does not have at least a minimal number * of replicas reported from data-nodes. */ - private boolean commitBlock(final BlockInfoUnderConstruction block, + private static boolean commitBlock(final BlockInfoUnderConstruction block, final Block commitBlock) throws IOException { if (block.getBlockUCState() == BlockUCState.COMMITTED) return false;