hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From umamah...@apache.org
Subject svn commit: r1343913 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src: main/java/org/apache/hadoop/contrib/bkjournal/ test/java/org/apache/hadoop/contrib/bkjournal/
Date Tue, 29 May 2012 18:50:08 GMT
Author: umamahesh
Date: Tue May 29 18:50:07 2012
New Revision: 1343913

URL: http://svn.apache.org/viewvc?rev=1343913&view=rev
Log:
HDFS-3452. BKJM:Switch from standby to active fails and NN gets shut down due to delay in
clearing of lock. Contributed by Uma Maheswara Rao G.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java?rev=1343913&r1=1343912&r2=1343913&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
Tue May 29 18:50:07 2012
@@ -56,15 +56,13 @@ class BookKeeperEditLogOutputStream
   private CountDownLatch syncLatch;
   private final AtomicInteger transmitResult
     = new AtomicInteger(BKException.Code.OK);
-  private final WriteLock wl;
   private final Writer writer;
 
   /**
    * Construct an edit log output stream which writes to a ledger.
 
    */
-  protected BookKeeperEditLogOutputStream(Configuration conf,
-                                          LedgerHandle lh, WriteLock wl)
+  protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh)
       throws IOException {
     super();
 
@@ -72,8 +70,6 @@ class BookKeeperEditLogOutputStream
     outstandingRequests = new AtomicInteger(0);
     syncLatch = null;
     this.lh = lh;
-    this.wl = wl;
-    this.wl.acquire();
     this.writer = new Writer(bufCurrent);
     this.transmissionThreshold
       = conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
@@ -108,7 +104,6 @@ class BookKeeperEditLogOutputStream
       throw new IOException("BookKeeper error during abort", bke);
     }
 
-    wl.release();
   }
 
   @Override
@@ -118,8 +113,6 @@ class BookKeeperEditLogOutputStream
 
   @Override
   public void write(FSEditLogOp op) throws IOException {
-    wl.checkWriteLock();
-
     writer.writeOp(op);
 
     if (bufCurrent.getLength() > transmissionThreshold) {
@@ -129,19 +122,15 @@ class BookKeeperEditLogOutputStream
 
   @Override
   public void setReadyToFlush() throws IOException {
-    wl.checkWriteLock();
-
     transmit();
 
-    synchronized(this) {
+    synchronized (this) {
       syncLatch = new CountDownLatch(outstandingRequests.get());
     }
   }
 
   @Override
   public void flushAndSync() throws IOException {
-    wl.checkWriteLock();
-
     assert(syncLatch != null);
     try {
       syncLatch.await();
@@ -164,8 +153,6 @@ class BookKeeperEditLogOutputStream
    * are never called at the same time.
    */
   private void transmit() throws IOException {
-    wl.checkWriteLock();
-
     if (!transmitResult.compareAndSet(BKException.Code.OK,
                                      BKException.Code.OK)) {
       throw new IOException("Trying to write to an errored stream;"

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java?rev=1343913&r1=1343912&r2=1343913&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
Tue May 29 18:50:07 2012
@@ -117,7 +117,7 @@ public class BookKeeperJournalManager im
   private final ZooKeeper zkc;
   private final Configuration conf;
   private final BookKeeper bkc;
-  private final WriteLock wl;
+  private final CurrentInprogress ci;
   private final String ledgerPath;
   private final MaxTxId maxTxId;
   private final int ensembleSize;
@@ -155,7 +155,7 @@ public class BookKeeperJournalManager im
 
     ledgerPath = zkPath + "/ledgers";
     String maxTxIdPath = zkPath + "/maxtxid";
-    String lockPath = zkPath + "/lock";
+    String currentInprogressNodePath = zkPath + "/CurrentInprogress";
     String versionPath = zkPath + "/version";
     digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
                         BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
@@ -192,7 +192,7 @@ public class BookKeeperJournalManager im
       throw new IOException("Error initializing zk", e);
     }
 
-    wl = new WriteLock(zkc, lockPath);
+    ci = new CurrentInprogress(zkc, currentInprogressNodePath);
     maxTxId = new MaxTxId(zkc, maxTxIdPath);
   }
 
@@ -207,13 +207,16 @@ public class BookKeeperJournalManager im
    */
   @Override
   public EditLogOutputStream startLogSegment(long txId) throws IOException {
-    wl.acquire();
-
     if (txId <= maxTxId.get()) {
       throw new IOException("We've already seen " + txId
           + ". A new stream cannot be created with it");
     }
     try {
+      String existingInprogressNode = ci.read();
+      if (null != existingInprogressNode
+          && zkc.exists(existingInprogressNode, false) != null) {
+        throw new IOException("Inprogress node already exists");
+      }
       if (currentLedger != null) {
         // bookkeeper errored on last stream, clean up ledger
         currentLedger.close();
@@ -234,7 +237,8 @@ public class BookKeeperJournalManager im
       l.write(zkc, znodePath);
 
       maxTxId.store(txId);
-      return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
+      ci.update(znodePath);
+      return new BookKeeperEditLogOutputStream(conf, currentLedger);
     } catch (Exception e) {
       if (currentLedger != null) {
         try {
@@ -270,7 +274,6 @@ public class BookKeeperJournalManager im
                               + " doesn't exist");
       }
 
-      wl.checkWriteLock();
       EditLogLedgerMetadata l
         =  EditLogLedgerMetadata.read(zkc, inprogressPath);
 
@@ -307,13 +310,15 @@ public class BookKeeperJournalManager im
       }
       maxTxId.store(lastTxId);
       zkc.delete(inprogressPath, inprogressStat.getVersion());
+      String inprogressPathFromCI = ci.read();
+      if (inprogressPath.equals(inprogressPathFromCI)) {
+        ci.clear();
+      }
     } catch (KeeperException e) {
       throw new IOException("Error finalising ledger", e);
     } catch (InterruptedException ie) {
       throw new IOException("Error finalising ledger", ie);
-    } finally {
-      wl.release();
-    }
+    } 
   }
 
   EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
@@ -417,7 +422,6 @@ public class BookKeeperJournalManager im
 
   @Override
   public void recoverUnfinalizedSegments() throws IOException {
-    wl.acquire();
     synchronized (this) {
       try {
         List<String> children = zkc.getChildren(ledgerPath, false);
@@ -445,10 +449,6 @@ public class BookKeeperJournalManager im
       } catch (InterruptedException ie) {
         throw new IOException("Interrupted getting list of inprogress segments",
                               ie);
-      } finally {
-        if (wl.haveLock()) {
-          wl.release();
-        }
       }
     }
   }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java?rev=1343913&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
(added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
Tue May 29 18:50:07 2012
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Distributed write permission lock, using ZooKeeper. Read the version number
+ * and return the current inprogress node path available in CurrentInprogress
+ * path. If it exist, caller can treat that some other client already operating
+ * on it. Then caller can take action. If there is no inprogress node exist,
+ * then caller can treat that there is no client operating on it. Later same
+ * caller should update the his newly created inprogress node path. At this
+ * point, if some other activities done on this node, version number might
+ * change, so update will fail. So, this read, update api will ensure that there
+ * is only node can continue further after checking with CurrentInprogress.
+ */
+
+class CurrentInprogress {
+  private static final String CONTENT_DELIMITER = ",";
+
+  static final Log LOG = LogFactory.getLog(CurrentInprogress.class);
+
+  private final ZooKeeper zkc;
+  private final String currentInprogressNode;
+  private volatile int versionNumberForPermission = -1;
+  private static final int CURRENT_INPROGRESS_LAYOUT_VERSION = -1; 
+  private final String hostName = InetAddress.getLocalHost().toString();
+
+  CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
+    this.currentInprogressNode = lockpath;
+    this.zkc = zkc;
+    try {
+      Stat isCurrentInprogressNodeExists = zkc.exists(lockpath, false);
+      if (isCurrentInprogressNodeExists == null) {
+        try {
+          zkc.create(lockpath, null, Ids.OPEN_ACL_UNSAFE,
+                  CreateMode.PERSISTENT);
+        } catch (NodeExistsException e) {
+          // Node might created by other process at the same time. Ignore it.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(lockpath + " already created by other process.", e);
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException("Exception accessing Zookeeper", e);
+    }
+  }
+
+  /**
+   * Update the path with prepending version number and hostname
+   * 
+   * @param path
+   *          - to be updated in zookeeper
+   * @throws IOException
+   */
+  void update(String path) throws IOException {
+    String content = CURRENT_INPROGRESS_LAYOUT_VERSION
+        + CONTENT_DELIMITER + hostName + CONTENT_DELIMITER + path;
+    try {
+      zkc.setData(this.currentInprogressNode, content.getBytes(),
+          this.versionNumberForPermission);
+    } catch (KeeperException e) {
+      throw new IOException("Exception when setting the data "
+          + "[layout version number,hostname,inprogressNode path]= [" + content
+          + "] to CurrentInprogress. ", e);
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while setting the data "
+          + "[layout version number,hostname,inprogressNode path]= [" + content
+          + "] to CurrentInprogress", e);
+    }
+    LOG.info("Updated data[layout version number,hostname,inprogressNode path]"
+        + "= [" + content + "] to CurrentInprogress");
+  }
+
+  /**
+   * Read the CurrentInprogress node data from Zookeeper and also get the znode
+   * version number. Return the 3rd field from the data. i.e saved path with
+   * #update api
+   * 
+   * @return available inprogress node path. returns null if not available.
+   * @throws IOException
+   */
+  String read() throws IOException {
+    Stat stat = new Stat();
+    byte[] data = null;
+    try {
+      data = zkc.getData(this.currentInprogressNode, false, stat);
+    } catch (KeeperException e) {
+      throw new IOException("Exception while reading the data from "
+          + currentInprogressNode, e);
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while reading data from "
+          + currentInprogressNode, e);
+    }
+    this.versionNumberForPermission = stat.getVersion();
+    if (data != null) {
+      String stringData = new String(data);
+      LOG.info("Read data[layout version number,hostname,inprogressNode path]"
+          + "= [" + stringData + "] from CurrentInprogress");
+      String[] contents = stringData.split(CONTENT_DELIMITER);
+      assert contents.length == 3 : "As per the current data format, "
+          + "CurrentInprogress node data should contain 3 fields. "
+          + "i.e layout version number,hostname,inprogressNode path";
+      String layoutVersion = contents[0];
+      if (Long.valueOf(layoutVersion) > CURRENT_INPROGRESS_LAYOUT_VERSION) {
+        throw new IOException(
+            "Supported layout version of CurrentInprogress node is : "
+                + CURRENT_INPROGRESS_LAYOUT_VERSION
+                + " . Layout version of CurrentInprogress node in ZK is : "
+                + layoutVersion);
+      }
+      String inprogressNodePath = contents[2];
+      return inprogressNodePath;
+    } else {
+      LOG.info("No data available in CurrentInprogress");
+    }
+    return null;
+  }
+
+  /** Clear the CurrentInprogress node data */
+  void clear() throws IOException {
+    try {
+      zkc.setData(this.currentInprogressNode, null, versionNumberForPermission);
+    } catch (KeeperException e) {
+      throw new IOException(
+          "Exception when setting the data to CurrentInprogress node", e);
+    } catch (InterruptedException e) {
+      throw new IOException(
+          "Interrupted when setting the data to CurrentInprogress node", e);
+    }
+    LOG.info("Cleared the data from CurrentInprogress");
+  }
+
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java?rev=1343913&r1=1343912&r2=1343913&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
Tue May 29 18:50:07 2012
@@ -215,8 +215,7 @@ public class TestBookKeeperAsHASharedDir
   }
 
   /**
-   * Test that two namenodes can't become primary at the same
-   * time.
+   * Test that two namenodes can't continue as primary
    */
   @Test
   public void testMultiplePrimariesStarted() throws Exception {
@@ -247,21 +246,17 @@ public class TestBookKeeperAsHASharedDir
       FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
       fs.mkdirs(p1);
       nn1.getRpcServer().rollEditLog();
-      try {
-        cluster.transitionToActive(1);
-        fail("Shouldn't have been able to start two primaries"
-             + " with single shared storage");
-      } catch (ServiceFailedException sfe) {
-        assertTrue("Wrong exception",
-            sfe.getMessage().contains("Failed to start active services"));
-      }
+      cluster.transitionToActive(1);
+      fs = cluster.getFileSystem(0); // get the older active server.
+      // This edit log updation on older active should make older active
+      // shutdown.
+      fs.delete(p1, true);
+      verify(mockRuntime1, atLeastOnce()).exit(anyInt());
+      verify(mockRuntime2, times(0)).exit(anyInt());
     } finally {
-      verify(mockRuntime1, times(0)).exit(anyInt());
-      verify(mockRuntime2, atLeastOnce()).exit(anyInt());
-
       if (cluster != null) {
         cluster.shutdown();
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1343913&r1=1343912&r2=1343913&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
Tue May 29 18:50:07 2012
@@ -361,6 +361,7 @@ public class TestBookKeeperJournalManage
 
       assertEquals("New bookie didn't start",
                    numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
+      bkjm.recoverUnfinalizedSegments();
       out = bkjm.startLogSegment(txid);
       for (long i = 1 ; i <= 3; i++) {
         FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java?rev=1343913&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java
(added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java
Tue May 29 18:50:07 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests that read, update, clear api from CurrentInprogress
+ */
+public class TestCurrentInprogress {
+  private static final Log LOG = LogFactory.getLog(TestCurrentInprogress.class);
+  private static final String CURRENT_NODE_PATH = "/test";
+  private static final String HOSTPORT = "127.0.0.1:2181";
+  private static final int CONNECTION_TIMEOUT = 30000;
+  private static NIOServerCnxnFactory serverFactory;
+  private static ZooKeeperServer zks;
+  private static ZooKeeper zkc;
+  private static int ZooKeeperDefaultPort = 2181;
+  private static File zkTmpDir;
+
+  private static ZooKeeper connectZooKeeper(String ensemble)
+      throws IOException, KeeperException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    ZooKeeper zkc = new ZooKeeper(HOSTPORT, 3600, new Watcher() {
+      public void process(WatchedEvent event) {
+        if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
+          latch.countDown();
+        }
+      }
+    });
+    if (!latch.await(10, TimeUnit.SECONDS)) {
+      throw new IOException("Zookeeper took too long to connect");
+    }
+    return zkc;
+  }
+
+  @BeforeClass
+  public static void setupZooKeeper() throws Exception {
+    LOG.info("Starting ZK server");
+    zkTmpDir = File.createTempFile("zookeeper", "test");
+    zkTmpDir.delete();
+    zkTmpDir.mkdir();
+    try {
+      zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperDefaultPort);
+      serverFactory = new NIOServerCnxnFactory();
+      serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
+      serverFactory.startup(zks);
+    } catch (Exception e) {
+      LOG.error("Exception while instantiating ZooKeeper", e);
+    }
+    boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
+    LOG.debug("ZooKeeper server up: " + b);
+  }
+
+  @AfterClass
+  public static void shutDownServer() {
+    if (null != zks) {
+      zks.shutdown();
+    }
+    zkTmpDir.delete();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    zkc = connectZooKeeper(HOSTPORT);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (null != zkc) {
+      zkc.close();
+    }
+
+  }
+
+  /**
+   * Tests that read should be able to read the data which updated with update
+   * api
+   */
+  @Test
+  public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception {
+    String data = "inprogressNode";
+    CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
+    ci.update(data);
+    String inprogressNodePath = ci.read();
+    assertEquals("Not returning inprogressZnode", "inprogressNode",
+        inprogressNodePath);
+  }
+
+  /**
+   * Tests that read should return null if we clear the updated data in
+   * CurrentInprogress node
+   */
+  @Test
+  public void testReadShouldReturnNullAfterClear() throws Exception {
+    CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
+    ci.update("myInprogressZnode");
+    ci.read();
+    ci.clear();
+    String inprogressNodePath = ci.read();
+    assertEquals("Expecting null to be return", null, inprogressNodePath);
+  }
+
+  /**
+   * Tests that update should throw IOE, if version number modifies between read
+   * and update
+   */
+  @Test(expected = IOException.class)
+  public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead()
+      throws Exception {
+    CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
+    ci.update("myInprogressZnode");
+    assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci
+        .read());
+    // Updating data in-between to change the data to change the version number
+    ci.update("YourInprogressZnode");
+    ci.update("myInprogressZnode");
+  }
+
+}
\ No newline at end of file



Mime
View raw message