Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A0A789EC2 for ; Tue, 29 May 2012 18:50:36 +0000 (UTC) Received: (qmail 61260 invoked by uid 500); 29 May 2012 18:50:36 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 61235 invoked by uid 500); 29 May 2012 18:50:36 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 61227 invoked by uid 99); 29 May 2012 18:50:36 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 May 2012 18:50:36 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 May 2012 18:50:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9D77D2388B71; Tue, 29 May 2012 18:50:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1343913 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src: main/java/org/apache/hadoop/contrib/bkjournal/ test/java/org/apache/hadoop/contrib/bkjournal/ Date: Tue, 29 May 2012 18:50:08 -0000 To: hdfs-commits@hadoop.apache.org From: umamahesh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120529185008.9D77D2388B71@eris.apache.org> Author: umamahesh Date: Tue May 29 18:50:07 2012 New Revision: 1343913 URL: http://svn.apache.org/viewvc?rev=1343913&view=rev Log: HDFS-3452. BKJM:Switch from standby to active fails and NN gets shut down due to delay in clearing of lock. Contributed by Uma Maheswara Rao G. Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java Removed: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java?rev=1343913&r1=1343912&r2=1343913&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java Tue May 29 18:50:07 2012 @@ -56,15 +56,13 @@ class BookKeeperEditLogOutputStream private CountDownLatch syncLatch; private final AtomicInteger transmitResult = new AtomicInteger(BKException.Code.OK); - private final WriteLock wl; private final Writer writer; /** * Construct an edit log output stream which writes to a ledger. */ - protected BookKeeperEditLogOutputStream(Configuration conf, - LedgerHandle lh, WriteLock wl) + protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh) throws IOException { super(); @@ -72,8 +70,6 @@ class BookKeeperEditLogOutputStream outstandingRequests = new AtomicInteger(0); syncLatch = null; this.lh = lh; - this.wl = wl; - this.wl.acquire(); this.writer = new Writer(bufCurrent); this.transmissionThreshold = conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE, @@ -108,7 +104,6 @@ class BookKeeperEditLogOutputStream throw new IOException("BookKeeper error during abort", bke); } - wl.release(); } @Override @@ -118,8 +113,6 @@ class BookKeeperEditLogOutputStream @Override public void write(FSEditLogOp op) throws IOException { - wl.checkWriteLock(); - writer.writeOp(op); if (bufCurrent.getLength() > transmissionThreshold) { @@ -129,19 +122,15 @@ class BookKeeperEditLogOutputStream @Override public void setReadyToFlush() throws IOException { - wl.checkWriteLock(); - transmit(); - synchronized(this) { + synchronized (this) { syncLatch = new CountDownLatch(outstandingRequests.get()); } } @Override public void flushAndSync() throws IOException { - wl.checkWriteLock(); - assert(syncLatch != null); try { syncLatch.await(); @@ -164,8 +153,6 @@ class BookKeeperEditLogOutputStream * are never called at the same time. */ private void transmit() throws IOException { - wl.checkWriteLock(); - if (!transmitResult.compareAndSet(BKException.Code.OK, BKException.Code.OK)) { throw new IOException("Trying to write to an errored stream;" Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1343913&r1=1343912&r2=1343913&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java Tue May 29 18:50:07 2012 @@ -117,7 +117,7 @@ public class BookKeeperJournalManager im private final ZooKeeper zkc; private final Configuration conf; private final BookKeeper bkc; - private final WriteLock wl; + private final CurrentInprogress ci; private final String ledgerPath; private final MaxTxId maxTxId; private final int ensembleSize; @@ -155,7 +155,7 @@ public class BookKeeperJournalManager im ledgerPath = zkPath + "/ledgers"; String maxTxIdPath = zkPath + "/maxtxid"; - String lockPath = zkPath + "/lock"; + String currentInprogressNodePath = zkPath + "/CurrentInprogress"; String versionPath = zkPath + "/version"; digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW, BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT); @@ -192,7 +192,7 @@ public class BookKeeperJournalManager im throw new IOException("Error initializing zk", e); } - wl = new WriteLock(zkc, lockPath); + ci = new CurrentInprogress(zkc, currentInprogressNodePath); maxTxId = new MaxTxId(zkc, maxTxIdPath); } @@ -207,13 +207,16 @@ public class BookKeeperJournalManager im */ @Override public EditLogOutputStream startLogSegment(long txId) throws IOException { - wl.acquire(); - if (txId <= maxTxId.get()) { throw new IOException("We've already seen " + txId + ". A new stream cannot be created with it"); } try { + String existingInprogressNode = ci.read(); + if (null != existingInprogressNode + && zkc.exists(existingInprogressNode, false) != null) { + throw new IOException("Inprogress node already exists"); + } if (currentLedger != null) { // bookkeeper errored on last stream, clean up ledger currentLedger.close(); @@ -234,7 +237,8 @@ public class BookKeeperJournalManager im l.write(zkc, znodePath); maxTxId.store(txId); - return new BookKeeperEditLogOutputStream(conf, currentLedger, wl); + ci.update(znodePath); + return new BookKeeperEditLogOutputStream(conf, currentLedger); } catch (Exception e) { if (currentLedger != null) { try { @@ -270,7 +274,6 @@ public class BookKeeperJournalManager im + " doesn't exist"); } - wl.checkWriteLock(); EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, inprogressPath); @@ -307,13 +310,15 @@ public class BookKeeperJournalManager im } maxTxId.store(lastTxId); zkc.delete(inprogressPath, inprogressStat.getVersion()); + String inprogressPathFromCI = ci.read(); + if (inprogressPath.equals(inprogressPathFromCI)) { + ci.clear(); + } } catch (KeeperException e) { throw new IOException("Error finalising ledger", e); } catch (InterruptedException ie) { throw new IOException("Error finalising ledger", ie); - } finally { - wl.release(); - } + } } EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk) @@ -417,7 +422,6 @@ public class BookKeeperJournalManager im @Override public void recoverUnfinalizedSegments() throws IOException { - wl.acquire(); synchronized (this) { try { List children = zkc.getChildren(ledgerPath, false); @@ -445,10 +449,6 @@ public class BookKeeperJournalManager im } catch (InterruptedException ie) { throw new IOException("Interrupted getting list of inprogress segments", ie); - } finally { - if (wl.haveLock()) { - wl.release(); - } } } } Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java?rev=1343913&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java Tue May 29 18:50:07 2012 @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import java.io.IOException; +import java.net.InetAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; + +/** + * Distributed write permission lock, using ZooKeeper. Read the version number + * and return the current inprogress node path available in CurrentInprogress + * path. If it exist, caller can treat that some other client already operating + * on it. Then caller can take action. If there is no inprogress node exist, + * then caller can treat that there is no client operating on it. Later same + * caller should update the his newly created inprogress node path. At this + * point, if some other activities done on this node, version number might + * change, so update will fail. So, this read, update api will ensure that there + * is only node can continue further after checking with CurrentInprogress. + */ + +class CurrentInprogress { + private static final String CONTENT_DELIMITER = ","; + + static final Log LOG = LogFactory.getLog(CurrentInprogress.class); + + private final ZooKeeper zkc; + private final String currentInprogressNode; + private volatile int versionNumberForPermission = -1; + private static final int CURRENT_INPROGRESS_LAYOUT_VERSION = -1; + private final String hostName = InetAddress.getLocalHost().toString(); + + CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException { + this.currentInprogressNode = lockpath; + this.zkc = zkc; + try { + Stat isCurrentInprogressNodeExists = zkc.exists(lockpath, false); + if (isCurrentInprogressNodeExists == null) { + try { + zkc.create(lockpath, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch (NodeExistsException e) { + // Node might created by other process at the same time. Ignore it. + if (LOG.isDebugEnabled()) { + LOG.debug(lockpath + " already created by other process.", e); + } + } + } + } catch (Exception e) { + throw new IOException("Exception accessing Zookeeper", e); + } + } + + /** + * Update the path with prepending version number and hostname + * + * @param path + * - to be updated in zookeeper + * @throws IOException + */ + void update(String path) throws IOException { + String content = CURRENT_INPROGRESS_LAYOUT_VERSION + + CONTENT_DELIMITER + hostName + CONTENT_DELIMITER + path; + try { + zkc.setData(this.currentInprogressNode, content.getBytes(), + this.versionNumberForPermission); + } catch (KeeperException e) { + throw new IOException("Exception when setting the data " + + "[layout version number,hostname,inprogressNode path]= [" + content + + "] to CurrentInprogress. ", e); + } catch (InterruptedException e) { + throw new IOException("Interrupted while setting the data " + + "[layout version number,hostname,inprogressNode path]= [" + content + + "] to CurrentInprogress", e); + } + LOG.info("Updated data[layout version number,hostname,inprogressNode path]" + + "= [" + content + "] to CurrentInprogress"); + } + + /** + * Read the CurrentInprogress node data from Zookeeper and also get the znode + * version number. Return the 3rd field from the data. i.e saved path with + * #update api + * + * @return available inprogress node path. returns null if not available. + * @throws IOException + */ + String read() throws IOException { + Stat stat = new Stat(); + byte[] data = null; + try { + data = zkc.getData(this.currentInprogressNode, false, stat); + } catch (KeeperException e) { + throw new IOException("Exception while reading the data from " + + currentInprogressNode, e); + } catch (InterruptedException e) { + throw new IOException("Interrupted while reading data from " + + currentInprogressNode, e); + } + this.versionNumberForPermission = stat.getVersion(); + if (data != null) { + String stringData = new String(data); + LOG.info("Read data[layout version number,hostname,inprogressNode path]" + + "= [" + stringData + "] from CurrentInprogress"); + String[] contents = stringData.split(CONTENT_DELIMITER); + assert contents.length == 3 : "As per the current data format, " + + "CurrentInprogress node data should contain 3 fields. " + + "i.e layout version number,hostname,inprogressNode path"; + String layoutVersion = contents[0]; + if (Long.valueOf(layoutVersion) > CURRENT_INPROGRESS_LAYOUT_VERSION) { + throw new IOException( + "Supported layout version of CurrentInprogress node is : " + + CURRENT_INPROGRESS_LAYOUT_VERSION + + " . Layout version of CurrentInprogress node in ZK is : " + + layoutVersion); + } + String inprogressNodePath = contents[2]; + return inprogressNodePath; + } else { + LOG.info("No data available in CurrentInprogress"); + } + return null; + } + + /** Clear the CurrentInprogress node data */ + void clear() throws IOException { + try { + zkc.setData(this.currentInprogressNode, null, versionNumberForPermission); + } catch (KeeperException e) { + throw new IOException( + "Exception when setting the data to CurrentInprogress node", e); + } catch (InterruptedException e) { + throw new IOException( + "Interrupted when setting the data to CurrentInprogress node", e); + } + LOG.info("Cleared the data from CurrentInprogress"); + } + +} \ No newline at end of file Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java?rev=1343913&r1=1343912&r2=1343913&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java Tue May 29 18:50:07 2012 @@ -215,8 +215,7 @@ public class TestBookKeeperAsHASharedDir } /** - * Test that two namenodes can't become primary at the same - * time. + * Test that two namenodes can't continue as primary */ @Test public void testMultiplePrimariesStarted() throws Exception { @@ -247,21 +246,17 @@ public class TestBookKeeperAsHASharedDir FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); fs.mkdirs(p1); nn1.getRpcServer().rollEditLog(); - try { - cluster.transitionToActive(1); - fail("Shouldn't have been able to start two primaries" - + " with single shared storage"); - } catch (ServiceFailedException sfe) { - assertTrue("Wrong exception", - sfe.getMessage().contains("Failed to start active services")); - } + cluster.transitionToActive(1); + fs = cluster.getFileSystem(0); // get the older active server. + // This edit log updation on older active should make older active + // shutdown. + fs.delete(p1, true); + verify(mockRuntime1, atLeastOnce()).exit(anyInt()); + verify(mockRuntime2, times(0)).exit(anyInt()); } finally { - verify(mockRuntime1, times(0)).exit(anyInt()); - verify(mockRuntime2, atLeastOnce()).exit(anyInt()); - if (cluster != null) { cluster.shutdown(); } } } -} \ No newline at end of file +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1343913&r1=1343912&r2=1343913&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Tue May 29 18:50:07 2012 @@ -361,6 +361,7 @@ public class TestBookKeeperJournalManage assertEquals("New bookie didn't start", numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10)); + bkjm.recoverUnfinalizedSegments(); out = bkjm.startLogSegment(txid); for (long i = 1 ; i <= 3; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java?rev=1343913&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java Tue May 29 18:50:07 2012 @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.util.LocalBookKeeper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests that read, update, clear api from CurrentInprogress + */ +public class TestCurrentInprogress { + private static final Log LOG = LogFactory.getLog(TestCurrentInprogress.class); + private static final String CURRENT_NODE_PATH = "/test"; + private static final String HOSTPORT = "127.0.0.1:2181"; + private static final int CONNECTION_TIMEOUT = 30000; + private static NIOServerCnxnFactory serverFactory; + private static ZooKeeperServer zks; + private static ZooKeeper zkc; + private static int ZooKeeperDefaultPort = 2181; + private static File zkTmpDir; + + private static ZooKeeper connectZooKeeper(String ensemble) + throws IOException, KeeperException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + ZooKeeper zkc = new ZooKeeper(HOSTPORT, 3600, new Watcher() { + public void process(WatchedEvent event) { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + latch.countDown(); + } + } + }); + if (!latch.await(10, TimeUnit.SECONDS)) { + throw new IOException("Zookeeper took too long to connect"); + } + return zkc; + } + + @BeforeClass + public static void setupZooKeeper() throws Exception { + LOG.info("Starting ZK server"); + zkTmpDir = File.createTempFile("zookeeper", "test"); + zkTmpDir.delete(); + zkTmpDir.mkdir(); + try { + zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperDefaultPort); + serverFactory = new NIOServerCnxnFactory(); + serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10); + serverFactory.startup(zks); + } catch (Exception e) { + LOG.error("Exception while instantiating ZooKeeper", e); + } + boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT); + LOG.debug("ZooKeeper server up: " + b); + } + + @AfterClass + public static void shutDownServer() { + if (null != zks) { + zks.shutdown(); + } + zkTmpDir.delete(); + } + + @Before + public void setup() throws Exception { + zkc = connectZooKeeper(HOSTPORT); + } + + @After + public void teardown() throws Exception { + if (null != zkc) { + zkc.close(); + } + + } + + /** + * Tests that read should be able to read the data which updated with update + * api + */ + @Test + public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception { + String data = "inprogressNode"; + CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH); + ci.update(data); + String inprogressNodePath = ci.read(); + assertEquals("Not returning inprogressZnode", "inprogressNode", + inprogressNodePath); + } + + /** + * Tests that read should return null if we clear the updated data in + * CurrentInprogress node + */ + @Test + public void testReadShouldReturnNullAfterClear() throws Exception { + CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH); + ci.update("myInprogressZnode"); + ci.read(); + ci.clear(); + String inprogressNodePath = ci.read(); + assertEquals("Expecting null to be return", null, inprogressNodePath); + } + + /** + * Tests that update should throw IOE, if version number modifies between read + * and update + */ + @Test(expected = IOException.class) + public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead() + throws Exception { + CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH); + ci.update("myInprogressZnode"); + assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci + .read()); + // Updating data in-between to change the data to change the version number + ci.update("YourInprogressZnode"); + ci.update("myInprogressZnode"); + } + +} \ No newline at end of file