Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
Mon Dec 24 04:50:02 2012
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.AsyncCallback;
@@ -33,12 +34,14 @@ import org.apache.zookeeper.AsyncCallbac
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Provided utilites for zookeeper access, etc.
*/
public class ZkUtils {
-
+ private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
/**
* Create zookeeper path recursively
*
@@ -94,10 +97,95 @@ public class ZkUtils {
}
+ private static class GetChildrenCtx {
+ int rc;
+ boolean done = false;
+ List<String> children = null;
+ }
+
+ /**
+ * Sync get all children under single zk node
+ *
+ * @param zk
+ * zookeeper client
+ * @param node
+ * node path
+ * @return direct children
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public static List<String> getChildrenInSingleNode(final ZooKeeper zk, final String
node)
+ throws InterruptedException, IOException {
+ final GetChildrenCtx ctx = new GetChildrenCtx();
+ getChildrenInSingleNode(zk, node, new GenericCallback<List<String>>()
{
+ @Override
+ public void operationComplete(int rc, List<String> ledgers) {
+ synchronized (ctx) {
+ if (Code.OK.intValue() == rc) {
+ ctx.children = ledgers;
+ }
+ ctx.rc = rc;
+ ctx.done = true;
+ ctx.notifyAll();
+ }
+ }
+ });
+
+ synchronized (ctx) {
+ while (ctx.done == false) {
+ ctx.wait();
+ }
+ }
+ if (Code.OK.intValue() != ctx.rc) {
+ throw new IOException("Error on getting children from node " + node);
+ }
+ return ctx.children;
+
+ }
+
+ /**
+ * Async get direct children under single node
+ *
+ * @param zk
+ * zookeeper client
+ * @param node
+ * node path
+ * @param cb
+ * callback function
+ */
+ public static void getChildrenInSingleNode(final ZooKeeper zk, final String node,
+ final GenericCallback<List<String>> cb) {
+ zk.sync(node, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("ZK error syncing nodes when getting children: ", KeeperException
+ .create(KeeperException.Code.get(rc), path));
+ cb.operationComplete(rc, null);
+ return;
+ }
+ zk.getChildren(node, false, new AsyncCallback.ChildrenCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, List<String>
nodes) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("Error polling ZK for the available nodes: ", KeeperException
+ .create(KeeperException.Code.get(rc), path));
+ cb.operationComplete(rc, null);
+ return;
+ }
+
+ cb.operationComplete(rc, nodes);
+
+ }
+ }, null);
+ }
+ }, null);
+ }
+
/**
* Get new ZooKeeper client. Waits till the connection is complete. If
* connection is not successful within timeout, then throws back exception.
- *
+ *
* @param servers
* ZK servers connection string.
* @param timeout
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
Mon Dec 24 04:50:02 2012
@@ -27,9 +27,9 @@ import java.nio.ByteBuffer;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.SnapshotMap;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
@@ -47,7 +47,7 @@ import junit.framework.TestCase;
public class LedgerCacheTest extends TestCase {
static Logger LOG = LoggerFactory.getLogger(LedgerCacheTest.class);
- ActiveLedgerManager activeLedgerManager;
+ SnapshotMap<Long, Boolean> activeLedgers;
LedgerManagerFactory ledgerManagerFactory;
LedgerCache ledgerCache;
ServerConfiguration conf;
@@ -75,7 +75,7 @@ public class LedgerCacheTest extends Tes
ledgerManagerFactory =
LedgerManagerFactory.newLedgerManagerFactory(conf, null);
- activeLedgerManager = ledgerManagerFactory.newActiveLedgerManager();
+ activeLedgers = new SnapshotMap<Long, Boolean>();
ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache;
}
@@ -83,7 +83,6 @@ public class LedgerCacheTest extends Tes
@After
public void tearDown() throws Exception {
bookie.ledgerStorage.shutdown();
- activeLedgerManager.close();
ledgerManagerFactory.uninitialize();
FileUtils.deleteDirectory(txnDir);
FileUtils.deleteDirectory(ledgerDir);
@@ -94,7 +93,7 @@ public class LedgerCacheTest extends Tes
ledgerCache.close();
}
ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache = new
LedgerCacheImpl(
- conf, activeLedgerManager, bookie.getLedgerDirsManager());
+ conf, activeLedgers, bookie.getLedgerDirsManager());
}
@Test
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
Mon Dec 24 04:50:02 2012
@@ -30,11 +30,13 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.bookie.GarbageCollector;
+import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.meta.ActiveLedgerManager.GarbageCollector;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.versioning.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +63,7 @@ public class GcLedgersTest extends Ledge
@Override
public void operationComplete(int rc, Long ledgerId) {
if (rc == BKException.Code.OK) {
- getActiveLedgerManager().addActiveLedger(ledgerId, true);
+ activeLedgers.put(ledgerId, true);
createdLedgers.add(ledgerId);
}
synchronized (expected) {
@@ -102,7 +104,8 @@ public class GcLedgersTest extends Ledge
for (int i=0; i<numRemovedLedgers; i++) {
long ledgerId = tmpList.get(i);
synchronized (removedLedgers) {
- getLedgerManager().deleteLedger(ledgerId, new GenericCallback<Void>()
{
+ getLedgerManager().removeLedgerMetadata(ledgerId, Version.ANY,
+ new GenericCallback<Void>() {
@Override
public void operationComplete(int rc, Void result) {
synchronized (removedLedgers) {
@@ -118,14 +121,15 @@ public class GcLedgersTest extends Ledge
final CountDownLatch inGcProgress = new CountDownLatch(1);
final CountDownLatch createLatch = new CountDownLatch(1);
final CountDownLatch endLatch = new CountDownLatch(2);
-
+ final GarbageCollector garbageCollector =
+ new ScanAndCompareGarbageCollector(getLedgerManager(), activeLedgers);
Thread gcThread = new Thread() {
@Override
public void run() {
- getActiveLedgerManager().garbageCollectLedgers(new GarbageCollector() {
+ garbageCollector.gc(new GarbageCollector.GarbageCleaner() {
boolean paused = false;
@Override
- public void gc(long ledgerId) {
+ public void clean(long ledgerId) {
if (!paused) {
inGcProgress.countDown();
try {
@@ -165,10 +169,10 @@ public class GcLedgersTest extends Ledge
// test ledgers
for (Long ledger : removedLedgers) {
- assertFalse(getActiveLedgerManager().containsActiveLedger(ledger));
+ assertFalse(activeLedgers.containsKey(ledger));
}
for (Long ledger : createdLedgers) {
- assertTrue(getActiveLedgerManager().containsActiveLedger(ledger));
+ assertTrue(activeLedgers.containsKey(ledger));
}
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
Mon Dec 24 04:50:02 2012
@@ -24,8 +24,8 @@ package org.apache.bookkeeper.meta;
import java.util.Arrays;
import java.util.Collection;
-import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,8 +36,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import junit.framework.TestCase;
-
/**
* Test case to run over serveral ledger managers
*/
@@ -47,10 +45,11 @@ public abstract class LedgerManagerTestC
LedgerManagerFactory ledgerManagerFactory;
LedgerManager ledgerManager = null;
- ActiveLedgerManager activeLedgerManager = null;
+ SnapshotMap<Long, Boolean> activeLedgers = null;
public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls)
{
super(0);
+ activeLedgers = new SnapshotMap<Long, Boolean>();
baseConf.setLedgerManagerFactoryClass(lmFactoryCls);
}
@@ -61,13 +60,6 @@ public abstract class LedgerManagerTestC
return ledgerManager;
}
- public ActiveLedgerManager getActiveLedgerManager() {
- if (null == activeLedgerManager) {
- activeLedgerManager = ledgerManagerFactory.newActiveLedgerManager();
- }
- return activeLedgerManager;
- }
-
@Parameters
public static Collection<Object[]> configs() {
return Arrays.asList(new Object[][] {
@@ -89,9 +81,6 @@ public abstract class LedgerManagerTestC
if (null != ledgerManager) {
ledgerManager.close();
}
- if (null != activeLedgerManager) {
- activeLedgerManager.close();
- }
ledgerManagerFactory.uninitialize();
super.tearDown();
}
|