Author: umamahesh Date: Thu Nov 8 18:36:37 2012 New Revision: 1407209 URL: http://svn.apache.org/viewvc?rev=1407209&view=rev Log: HDFS-3810. Implement format() for BKJM. Contributed by Ivan Kelly. Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/ (props changed) hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props changed) hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project:r1407182 Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1407182 Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1407209&r1=1407208&r2=1407209&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov 8 18:36:37 2012 @@ -206,6 +206,8 @@ Release 2.0.3-alpha - Unreleased HDFS-3979. For hsync, datanode should wait for the local sync to complete before sending ack. (Lars Hofhansl via szetszwo) + HDFS-3810. Implement format() for BKJM (Ivan Kelly via umamahesh) + HDFS-3625. Fix TestBackupNode by properly initializing edit log during startup. (Junping Du via todd) Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1407209&r1=1407208&r2=1407209&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java Thu Nov 8 18:36:37 2012 @@ -39,6 +39,7 @@ import org.apache.zookeeper.KeeperExcept import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.ZKUtil; import java.util.Collection; import java.util.Collections; @@ -46,6 +47,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.io.IOException; import java.net.URI; @@ -142,13 +144,16 @@ public class BookKeeperJournalManager im private final Configuration conf; private final BookKeeper bkc; private final CurrentInprogress ci; + private final String basePath; private final String ledgerPath; + private final String versionPath; private final MaxTxId maxTxId; private final int ensembleSize; private final int quorumSize; private final String digestpw; private final CountDownLatch zkConnectLatch; private final NamespaceInfo nsInfo; + private boolean initialized = false; private LedgerHandle currentLedger = null; /** @@ -160,16 +165,16 @@ public class BookKeeperJournalManager im this.nsInfo = nsInfo; String zkConnect = uri.getAuthority().replace(";", ","); - String zkPath = uri.getPath(); + basePath = uri.getPath(); ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE, BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT); quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE, BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT); - ledgerPath = zkPath + "/ledgers"; - String maxTxIdPath = zkPath + "/maxtxid"; - String currentInprogressNodePath = zkPath + "/CurrentInprogress"; - String versionPath = zkPath + "/version"; + ledgerPath = basePath + "/ledgers"; + String maxTxIdPath = basePath + "/maxtxid"; + String currentInprogressNodePath = basePath + "/CurrentInprogress"; + versionPath = basePath + "/version"; digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW, BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT); @@ -180,47 +185,7 @@ public class BookKeeperJournalManager im if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) { throw new IOException("Error connecting to zookeeper"); } - if (zkc.exists(zkPath, false) == null) { - zkc.create(zkPath, new byte[] {'0'}, - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - Stat versionStat = zkc.exists(versionPath, false); - if (versionStat != null) { - byte[] d = zkc.getData(versionPath, false, versionStat); - VersionProto.Builder builder = VersionProto.newBuilder(); - TextFormat.merge(new String(d, UTF_8), builder); - if (!builder.isInitialized()) { - throw new IOException("Invalid/Incomplete data in znode"); - } - VersionProto vp = builder.build(); - - // There's only one version at the moment - assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION; - - NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo()); - - if (nsInfo.getNamespaceID() != readns.getNamespaceID() || - !nsInfo.clusterID.equals(readns.getClusterID()) || - !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) { - String err = String.format("Environment mismatch. Running process %s" - +", stored in ZK %s", nsInfo, readns); - LOG.error(err); - throw new IOException(err); - } - } else if (nsInfo.getNamespaceID() > 0) { - VersionProto.Builder builder = VersionProto.newBuilder(); - builder.setNamespaceInfo(PBHelper.convert(nsInfo)) - .setLayoutVersion(BKJM_LAYOUT_VERSION); - byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8); - zkc.create(versionPath, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - - if (zkc.exists(ledgerPath, false) == null) { - zkc.create(ledgerPath, new byte[] {'0'}, - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } prepareBookKeeperEnv(); bkc = new BookKeeper(new ClientConfiguration(), zkc); } catch (KeeperException e) { @@ -244,6 +209,7 @@ public class BookKeeperJournalManager im BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT); final CountDownLatch zkPathLatch = new CountDownLatch(1); + final AtomicBoolean success = new AtomicBoolean(false); StringCallback callback = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { @@ -251,22 +217,23 @@ public class BookKeeperJournalManager im || KeeperException.Code.NODEEXISTS.intValue() == rc) { LOG.info("Successfully created bookie available path : " + zkAvailablePath); - zkPathLatch.countDown(); + success.set(true); } else { KeeperException.Code code = KeeperException.Code.get(rc); - LOG - .error("Error : " + LOG.error("Error : " + KeeperException.create(code, path).getMessage() + ", failed to create bookie available path : " + zkAvailablePath); } + zkPathLatch.countDown(); } }; ZkUtils.createFullPathOptimistic(zkc, zkAvailablePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null); try { - if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)) { + if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS) + || !success.get()) { throw new IOException("Couldn't create bookie available path :" + zkAvailablePath + ", timed out " + zkc.getSessionTimeout() + " millis"); @@ -281,19 +248,101 @@ public class BookKeeperJournalManager im @Override public void format(NamespaceInfo ns) throws IOException { - // Currently, BKJM automatically formats itself when first accessed. - // TODO: change over to explicit formatting so that the admin can - // clear out the BK storage when reformatting a cluster. - LOG.info("Not formatting " + this + " - BKJM does not currently " + - "support reformatting. If it has not been used before, it will" + - "be formatted automatically upon first use."); + try { + // delete old info + Stat baseStat = null; + Stat ledgerStat = null; + if ((baseStat = zkc.exists(basePath, false)) != null) { + if ((ledgerStat = zkc.exists(ledgerPath, false)) != null) { + for (EditLogLedgerMetadata l : getLedgerList(true)) { + try { + bkc.deleteLedger(l.getLedgerId()); + } catch (BKException.BKNoSuchLedgerExistsException bke) { + LOG.warn("Ledger " + l.getLedgerId() + " does not exist;" + + " Cannot delete."); + } + } + } + ZKUtil.deleteRecursive(zkc, basePath); + } + + // should be clean now. + zkc.create(basePath, new byte[] {'0'}, + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + VersionProto.Builder builder = VersionProto.newBuilder(); + builder.setNamespaceInfo(PBHelper.convert(ns)) + .setLayoutVersion(BKJM_LAYOUT_VERSION); + + byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8); + zkc.create(versionPath, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + zkc.create(ledgerPath, new byte[] {'0'}, + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException ke) { + LOG.error("Error accessing zookeeper to format", ke); + throw new IOException("Error accessing zookeeper to format", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted during format", ie); + } catch (BKException bke) { + throw new IOException("Error cleaning up ledgers during format", bke); + } } @Override public boolean hasSomeData() throws IOException { - // Don't confirm format on BKJM, since format() is currently a - // no-op anyway - return false; + try { + return zkc.exists(basePath, false) != null; + } catch (KeeperException ke) { + throw new IOException("Couldn't contact zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while checking for data", ie); + } + } + + synchronized private void checkEnv() throws IOException { + if (!initialized) { + try { + Stat versionStat = zkc.exists(versionPath, false); + if (versionStat == null) { + throw new IOException("Environment not initialized. " + +"Have you forgotten to format?"); + } + byte[] d = zkc.getData(versionPath, false, versionStat); + + VersionProto.Builder builder = VersionProto.newBuilder(); + TextFormat.merge(new String(d, UTF_8), builder); + if (!builder.isInitialized()) { + throw new IOException("Invalid/Incomplete data in znode"); + } + VersionProto vp = builder.build(); + + // There's only one version at the moment + assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION; + + NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo()); + + if (nsInfo.getNamespaceID() != readns.getNamespaceID() || + !nsInfo.clusterID.equals(readns.getClusterID()) || + !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) { + String err = String.format("Environment mismatch. Running process %s" + +", stored in ZK %s", nsInfo, readns); + LOG.error(err); + throw new IOException(err); + } + + ci.init(); + initialized = true; + } catch (KeeperException ke) { + throw new IOException("Cannot access ZooKeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while checking environment", ie); + } + } } /** @@ -307,6 +356,8 @@ public class BookKeeperJournalManager im */ @Override public EditLogOutputStream startLogSegment(long txId) throws IOException { + checkEnv(); + if (txId <= maxTxId.get()) { throw new IOException("We've already seen " + txId + ". A new stream cannot be created with it"); @@ -384,6 +435,8 @@ public class BookKeeperJournalManager im @Override public void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException { + checkEnv(); + String inprogressPath = inprogressZNode(firstTxId); try { Stat inprogressStat = zkc.exists(inprogressPath, false); @@ -537,6 +590,8 @@ public class BookKeeperJournalManager im @Override public void recoverUnfinalizedSegments() throws IOException { + checkEnv(); + synchronized (this) { try { List children = zkc.getChildren(ledgerPath, false); @@ -589,6 +644,8 @@ public class BookKeeperJournalManager im @Override public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { + checkEnv(); + for (EditLogLedgerMetadata l : getLedgerList(false)) { if (l.getLastTxId() < minTxIdToKeep) { try { Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java?rev=1407209&r1=1407208&r2=1407209&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java Thu Nov 8 18:36:37 2012 @@ -56,6 +56,9 @@ class CurrentInprogress { CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException { this.currentInprogressNode = lockpath; this.zkc = zkc; + } + + void init() throws IOException { try { Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode, false); @@ -96,15 +99,14 @@ class CurrentInprogress { this.versionNumberForPermission); } catch (KeeperException e) { throw new IOException("Exception when setting the data " - + "[layout version number,hostname,inprogressNode path]= [" + content - + "] to CurrentInprogress. ", e); + + "[" + content + "] to CurrentInprogress. ", e); } catch (InterruptedException e) { throw new IOException("Interrupted while setting the data " - + "[layout version number,hostname,inprogressNode path]= [" + content - + "] to CurrentInprogress", e); + + "[" + content + "] to CurrentInprogress", e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Updated data[" + content + "] to CurrentInprogress"); } - LOG.info("Updated data[layout version number,hostname,inprogressNode path]" - + "= [" + content + "] to CurrentInprogress"); } /** @@ -136,7 +138,7 @@ class CurrentInprogress { } return builder.build().getPath(); } else { - LOG.info("No data available in CurrentInprogress"); + LOG.debug("No data available in CurrentInprogress"); } return null; } @@ -152,7 +154,7 @@ class CurrentInprogress { throw new IOException( "Interrupted when setting the data to CurrentInprogress node", e); } - LOG.info("Cleared the data from CurrentInprogress"); + LOG.debug("Cleared the data from CurrentInprogress"); } } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java?rev=1407209&r1=1407208&r2=1407209&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java Thu Nov 8 18:36:37 2012 @@ -149,6 +149,7 @@ public class TestBookKeeperConfiguration bkjm = new BookKeeperJournalManager(conf, URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"), nsi); + bkjm.format(nsi); Assert.assertNotNull("Bookie available path : " + bkAvailablePath + " doesn't exists", zkc.exists(bkAvailablePath, false)); } @@ -166,6 +167,7 @@ public class TestBookKeeperConfiguration bkjm = new BookKeeperJournalManager(conf, URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"), nsi); + bkjm.format(nsi); Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH + " doesn't exists", zkc.exists(BK_ROOT_PATH, false)); } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1407209&r1=1407208&r2=1407209&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Thu Nov 8 18:36:37 2012 @@ -29,8 +29,16 @@ import org.mockito.Mockito; import java.io.IOException; import java.net.URI; import java.util.List; +import java.util.ArrayList; import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; @@ -90,6 +98,7 @@ public class TestBookKeeperJournalManage NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); + bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { @@ -112,6 +121,8 @@ public class TestBookKeeperJournalManage BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi); + bkjm.format(nsi); + EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -130,6 +141,7 @@ public class TestBookKeeperJournalManage NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi); + bkjm.format(nsi); long txid = 1; for (long i = 0; i < 3; i++) { @@ -167,6 +179,7 @@ public class TestBookKeeperJournalManage NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi); + bkjm.format(nsi); long txid = 1; for (long i = 0; i < 3; i++) { @@ -208,6 +221,7 @@ public class TestBookKeeperJournalManage NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi); + bkjm.format(nsi); long txid = 1; long start = txid; @@ -266,6 +280,7 @@ public class TestBookKeeperJournalManage BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); + bkjm1.format(nsi); BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); @@ -288,6 +303,7 @@ public class TestBookKeeperJournalManage BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simpleread"), nsi); + bkjm.format(nsi); final long numTransactions = 10000; EditLogOutputStream out = bkjm.startLogSegment(1); @@ -315,6 +331,7 @@ public class TestBookKeeperJournalManage BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"), nsi); + bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { @@ -365,6 +382,7 @@ public class TestBookKeeperJournalManage BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"), nsi); + bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(txid); for (long i = 1 ; i <= 3; i++) { @@ -450,6 +468,7 @@ public class TestBookKeeperJournalManage BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), nsi); + bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(txid); for (long i = 1 ; i <= 3; i++) { @@ -500,6 +519,7 @@ public class TestBookKeeperJournalManage NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, nsi); + bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1; i <= 100; i++) { @@ -541,6 +561,7 @@ public class TestBookKeeperJournalManage NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, nsi); + bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1; i <= 100; i++) { @@ -583,6 +604,7 @@ public class TestBookKeeperJournalManage NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, nsi); + bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1; i <= 100; i++) { @@ -622,6 +644,7 @@ public class TestBookKeeperJournalManage NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, nsi); + bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1; i <= 100; i++) { @@ -669,6 +692,7 @@ public class TestBookKeeperJournalManage NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, nsi); + bkjm.format(nsi); try { // start new inprogress log segment with txid=1 @@ -697,6 +721,81 @@ public class TestBookKeeperJournalManage } } + private enum ThreadStatus { + COMPLETED, GOODEXCEPTION, BADEXCEPTION; + }; + + /** + * Tests that concurrent calls to format will still allow one to succeed. + */ + @Test + public void testConcurrentFormat() throws Exception { + final URI uri = BKJMUtil.createJournalURI("/hdfsjournal-concurrentformat"); + final NamespaceInfo nsi = newNSInfo(); + + // populate with data first + BookKeeperJournalManager bkjm + = new BookKeeperJournalManager(conf, uri, nsi); + bkjm.format(nsi); + for (int i = 1; i < 100*2; i += 2) { + bkjm.startLogSegment(i); + bkjm.finalizeLogSegment(i, i+1); + } + bkjm.close(); + + final int numThreads = 40; + List> threads + = new ArrayList>(); + final CyclicBarrier barrier = new CyclicBarrier(numThreads); + + for (int i = 0; i < numThreads; i++) { + threads.add(new Callable() { + public ThreadStatus call() { + BookKeeperJournalManager bkjm = null; + try { + bkjm = new BookKeeperJournalManager(conf, uri, nsi); + barrier.await(); + bkjm.format(nsi); + return ThreadStatus.COMPLETED; + } catch (IOException ioe) { + LOG.info("Exception formatting ", ioe); + return ThreadStatus.GOODEXCEPTION; + } catch (InterruptedException ie) { + LOG.error("Interrupted. Something is broken", ie); + Thread.currentThread().interrupt(); + return ThreadStatus.BADEXCEPTION; + } catch (Exception e) { + LOG.error("Some other bad exception", e); + return ThreadStatus.BADEXCEPTION; + } finally { + if (bkjm != null) { + try { + bkjm.close(); + } catch (IOException ioe) { + LOG.error("Error closing journal manager", ioe); + } + } + } + } + }); + } + ExecutorService service = Executors.newFixedThreadPool(numThreads); + List> statuses = service.invokeAll(threads, 60, + TimeUnit.SECONDS); + int numCompleted = 0; + for (Future s : statuses) { + assertTrue(s.isDone()); + assertTrue("Thread threw invalid exception", + s.get() == ThreadStatus.COMPLETED + || s.get() == ThreadStatus.GOODEXCEPTION); + if (s.get() == ThreadStatus.COMPLETED) { + numCompleted++; + } + } + LOG.info("Completed " + numCompleted + " formats"); + assertTrue("No thread managed to complete formatting", numCompleted > 0); + } + private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm, int startTxid, int endTxid) throws IOException, KeeperException, InterruptedException { Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java?rev=1407209&r1=1407208&r2=1407209&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java Thu Nov 8 18:36:37 2012 @@ -118,6 +118,7 @@ public class TestCurrentInprogress { public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception { String data = "inprogressNode"; CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH); + ci.init(); ci.update(data); String inprogressNodePath = ci.read(); assertEquals("Not returning inprogressZnode", "inprogressNode", @@ -131,6 +132,7 @@ public class TestCurrentInprogress { @Test public void testReadShouldReturnNullAfterClear() throws Exception { CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH); + ci.init(); ci.update("myInprogressZnode"); ci.read(); ci.clear(); @@ -146,6 +148,7 @@ public class TestCurrentInprogress { public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead() throws Exception { CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH); + ci.init(); ci.update("myInprogressZnode"); assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci .read()); @@ -154,4 +157,4 @@ public class TestCurrentInprogress { ci.update("myInprogressZnode"); } -} \ No newline at end of file +}