bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [01/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
Date Wed, 16 Mar 2016 03:44:11 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 410ff7263 -> 9a8d62b1d


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java
b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java
deleted file mode 100644
index a54d0d4..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.SynchronousQueue;
-
-import junit.framework.Assert;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-
-public class TestTopicBasedLoadShedder {
-
-    final protected SynchronousQueue<Boolean> statusQueue = new SynchronousQueue<Boolean>();
-    private int myTopics = 10;
-    private int numHubs = 10;
-    private List<ByteString> mockTopicList;
-    private final HubLoad infiniteMaxLoad = new HubLoad(10000000);
-    Map<HubInfo, HubLoad> mockLoadMap = new HashMap<HubInfo, HubLoad>();
-
-    class MockTopicBasedLoadShedder extends TopicBasedLoadShedder {
-        // This is set by the reduceLoadTo function.
-        public HubLoad targetLoad;
-        public MockTopicBasedLoadShedder(TopicManager tm, List<ByteString> topicList,
-                                         Double tolerancePercentage, HubLoad maxLoadToShed)
{
-            super(tm, topicList, tolerancePercentage, maxLoadToShed);
-        }
-        @Override
-        public void reduceLoadTo(HubLoad targetLoad, final Callback<Long> callback,
final Object ctx) {
-            this.targetLoad = targetLoad;
-            // Indicates that we released these many topics.
-            callback.operationFinished(ctx, targetLoad.toHubLoadData().getNumTopics());
-        }
-    }
-    public Callback<Boolean> getShedLoadCallback(final MockTopicBasedLoadShedder ls,
final HubLoad expected,
-                                                 final Boolean shouldRelease, final Boolean
shouldFail) {
-        return new Callback<Boolean>() {
-            @Override
-            public void operationFinished(Object o, Boolean aBoolean) {
-                Boolean status = false;
-                status = (aBoolean == shouldRelease);
-                if (shouldRelease) {
-                    status &= (ls.targetLoad != null);
-                    status &= (expected.numTopics == ls.targetLoad.numTopics);
-                }
-                final Boolean statusToPut = status;
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        ConcurrencyUtils.put(statusQueue, statusToPut);
-                    }
-                }).start();
-            }
-
-            @Override
-            public void operationFailed(Object o, PubSubException e) {
-                new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        ConcurrencyUtils.put(statusQueue, shouldFail);
-                    }
-                }).start();
-            }
-        };
-    }
-
-    private List<ByteString> getMockTopicList(int numTopics) {
-        List<ByteString> topics = new ArrayList<ByteString>();
-        for (int i = 0; i < numTopics; i++) {
-            topics.add(ByteString.copyFromUtf8("MyTopic_" + i));
-        }
-        return topics;
-    }
-
-    private HubInfo getHubInfo(int hubNum) {
-        return new HubInfo(new HedwigSocketAddress("myhub.testdomain.foo"+hubNum+":4080:4080"),
0);
-    }
-
-    private synchronized void initialize(int myTopics, int numHubs, int[] otherHubsLoad)
{
-        if (null != otherHubsLoad) {
-            Assert.assertTrue(otherHubsLoad.length == numHubs - 1);
-        }
-        this.myTopics = myTopics;
-        mockTopicList = getMockTopicList(this.myTopics);
-        this.numHubs = numHubs;
-        this.mockLoadMap.clear();
-        this.mockLoadMap.put(getHubInfo(0), new HubLoad(this.myTopics));
-        for (int i = 1; i < this.numHubs; i++) {
-            this.mockLoadMap.put(getHubInfo(i), new HubLoad(otherHubsLoad[i-1]));
-        }
-    }
-
-    private int[] getEqualLoadDistributionArray(int n, int load) {
-        if (n == 0) {
-            return null;
-        }
-        int[] retLoad = new int[n];
-        Arrays.fill(retLoad, load);
-        return retLoad;
-    }
-
-    @Test(timeout = 60000)
-    public synchronized  void testAllHubsSameTopics() throws Exception {
-        // All hubs have the same number of topics. We should not release any topics even
with a
-        // tolerance of 0.0.
-        initialize(10, 10, getEqualLoadDistributionArray(9, 10));
-        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList,
0.0, infiniteMaxLoad);
-        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null);
-        Assert.assertTrue(statusQueue.take());
-    }
-
-    @Test(timeout = 60000)
-    public synchronized void testOneHubUnequalTopics() throws Exception {
-        // The hub has 20 topics while the average is 11. Should reduce the load to 11.
-        initialize(20, 10, getEqualLoadDistributionArray(9, 10));
-        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList,
0.0, infiniteMaxLoad);
-        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false),
null);
-        Assert.assertTrue(statusQueue.take());
-    }
-
-    @Test(timeout = 60000)
-    public synchronized void testOneHubUnequalTopicsWithTolerance() throws Exception {
-        // The hub has 20 topics and average is 11. Should still release as tolerance level
of 50.0 is
-        // breached. Should get down to average.
-        initialize(20, 10, getEqualLoadDistributionArray(9, 10));
-        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList,
50.0, infiniteMaxLoad);
-        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false),
null);
-        Assert.assertTrue(statusQueue.take());
-
-        // A tolerance level of 100.0 should result in the hub not releasing topics.
-        tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 100.0, infiniteMaxLoad);
-        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null);
-        Assert.assertTrue(statusQueue.take());
-    }
-
-    @Test(timeout = 60000)
-    public synchronized void testMaxLoadShed() throws Exception {
-        // The hub should not shed more than maxLoadShed topics.
-        initialize(20, 10, getEqualLoadDistributionArray(9, 10));
-        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList,
0.0, new HubLoad(5));
-        // Our load should reduce to 15.
-        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(15), true, false),
null);
-        Assert.assertTrue(statusQueue.take());
-
-        // We should reduce to 11 even when maxLoadShed and average result in the same
-        // values
-        tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, new HubLoad(9));
-        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false),
null);
-        Assert.assertTrue(statusQueue.take());
-    }
-
-    @Test(timeout = 60000)
-    public synchronized void testSingleHubLoadShed() throws Exception {
-        // If this is the only hub in the cluster, it should not release any topics.
-        initialize(20, 1, null);
-        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList,
0.0, infiniteMaxLoad);
-        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null);
-        Assert.assertTrue(statusQueue.take());
-    }
-
-    @Test(timeout = 60000)
-    public synchronized void testUnderloadedClusterLoadShed() throws Exception {
-        // Hold on to at least one topic while shedding load (if cluster is underloaded)
-        initialize(5, 10, getEqualLoadDistributionArray(9, 0));
-        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList,
0.0, infiniteMaxLoad);
-        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(1), true, false),
null);
-        Assert.assertTrue(statusQueue.take());
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
deleted file mode 100644
index 90e77b2..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.CompositeException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.util.Pair;
-import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestZkTopicManager extends ZooKeeperTestBase {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TestZkTopicManager.class);
-
-    protected ZkTopicManager tm;
-
-    protected class CallbackQueue<T> implements Callback<T> {
-        SynchronousQueue<Either<T, Exception>> q = new SynchronousQueue<Either<T,
Exception>>();
-
-        public SynchronousQueue<Either<T, Exception>> getQueue() {
-            return q;
-        }
-
-        public Either<T, Exception> take() throws InterruptedException {
-            return q.take();
-        }
-
-        @Override
-        public void operationFailed(Object ctx, final PubSubException exception) {
-            LOG.error("got exception: " + exception);
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    ConcurrencyUtils.put(q, Either.of((T) null, (Exception) exception));
-                }
-            }).start();
-        }
-
-        @Override
-        public void operationFinished(Object ctx, final T resultOfOperation) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    ConcurrencyUtils.put(q, Either.of(resultOfOperation, (Exception) null));
-                }
-            }).start();
-        }
-    }
-
-    protected CallbackQueue<HedwigSocketAddress> addrCbq = new CallbackQueue<HedwigSocketAddress>();
-    protected CallbackQueue<ByteString> bsCbq = new CallbackQueue<ByteString>();
-    protected CallbackQueue<Void> voidCbq = new CallbackQueue<Void>();
-
-    protected ByteString topic = ByteString.copyFromUtf8("topic");
-    protected ServerConfiguration cfg;
-    protected HedwigSocketAddress me;
-    protected ScheduledExecutorService scheduler;
-
-    private volatile int DEFAULT_MAX_NUM_TOPICS = Integer.MAX_VALUE;
-    private volatile int DEFAULT_RETENTION_SECS_AFTER_ACCESS = 0;
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        cfg = new ServerConfiguration() {
-            @Override
-            public int getRetentionSecsAfterAccess() {
-                return DEFAULT_RETENTION_SECS_AFTER_ACCESS;
-            }
-            @Override
-            public int getMaxNumTopics() {
-                return DEFAULT_MAX_NUM_TOPICS;
-            }
-        };
-        me = cfg.getServerAddr();
-        scheduler = Executors.newSingleThreadScheduledExecutor();
-        tm = new ZkTopicManager(zk, cfg, scheduler);
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        tm.stop();
-        super.tearDown();
-    }
-
-    @Test(timeout=60000)
-    public void testGetOwnerSingle() throws Exception {
-        tm.getOwner(topic, false, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-    }
-
-    protected ByteString mkTopic(int i) {
-        return ByteString.copyFromUtf8(topic.toStringUtf8() + i);
-    }
-
-    protected <T> T check(Either<T, Exception> ex) throws Exception {
-        if (ex.left() == null)
-            throw ex.right();
-        else
-            return ex.left();
-    }
-
-    public static class CustomServerConfiguration extends ServerConfiguration {
-        int port;
-
-        public CustomServerConfiguration(int port) {
-            this.port = port;
-        }
-
-        @Override
-        public int getServerPort() {
-            return port;
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testGetOwnerMulti() throws Exception {
-        ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1),
cfg2 = new CustomServerConfiguration(
-            cfg.getServerPort() + 2);
-        // TODO change cfg1 cfg2 params
-        ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler),
-                       tm2 = new ZkTopicManager(zk, cfg2, scheduler);
-
-        tm.getOwner(topic, false, addrCbq, null);
-        HedwigSocketAddress owner = check(addrCbq.take());
-
-        // If we were told to have another person claim the topic, make them
-        // claim the topic.
-        if (owner.getPort() == cfg1.getServerPort())
-            tm1.getOwner(topic, true, addrCbq, null);
-        else if (owner.getPort() == cfg2.getServerPort())
-            tm2.getOwner(topic, true, addrCbq, null);
-        if (owner.getPort() != cfg.getServerPort())
-            Assert.assertEquals(owner, check(addrCbq.take()));
-
-        for (int i = 0; i < 100; ++i) {
-            tm.getOwner(topic, false, addrCbq, null);
-            Assert.assertEquals(owner, check(addrCbq.take()));
-
-            tm1.getOwner(topic, false, addrCbq, null);
-            Assert.assertEquals(owner, check(addrCbq.take()));
-
-            tm2.getOwner(topic, false, addrCbq, null);
-            Assert.assertEquals(owner, check(addrCbq.take()));
-        }
-
-        // Give us 100 chances to choose another owner if not shouldClaim.
-        for (int i = 0; i < 100; ++i) {
-            if (!owner.equals(me))
-                break;
-            tm.getOwner(mkTopic(i), false, addrCbq, null);
-            owner = check(addrCbq.take());
-            if (i == 99)
-                Assert.fail("Never chose another owner");
-        }
-
-        // Make sure we always choose ourselves if shouldClaim.
-        for (int i = 0; i < 100; ++i) {
-            tm.getOwner(mkTopic(100), true, addrCbq, null);
-            Assert.assertEquals(me, check(addrCbq.take()));
-        }
-
-        tm1.stop();
-        tm2.stop();
-    }
-
-    @Test(timeout=60000)
-    public void testLoadBalancing() throws Exception {
-        tm.getOwner(topic, false, addrCbq, null);
-
-        Assert.assertEquals(me, check(addrCbq.take()));
-
-        ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1);
-        TopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler);
-
-        ByteString topic1 = mkTopic(1);
-        tm.getOwner(topic1, false, addrCbq, null);
-        Assert.assertEquals(cfg1.getServerAddr(), check(addrCbq.take()));
-
-        tm1.stop();
-    }
-
-    class StubOwnershipChangeListener implements TopicOwnershipChangeListener {
-        boolean failure;
-        SynchronousQueue<Pair<ByteString, Boolean>> bsQueue;
-
-        public StubOwnershipChangeListener(SynchronousQueue<Pair<ByteString, Boolean>>
bsQueue) {
-            this.bsQueue = bsQueue;
-        }
-
-        public void setFailure(boolean failure) {
-            this.failure = failure;
-        }
-
-        @Override
-        public void lostTopic(final ByteString topic) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    ConcurrencyUtils.put(bsQueue, Pair.of(topic, false));
-                }
-            }).start();
-        }
-
-        public void acquiredTopic(final ByteString topic, final Callback<Void> callback,
final Object ctx) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    ConcurrencyUtils.put(bsQueue, Pair.of(topic, true));
-                    if (failure) {
-                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked
to fail"));
-                    } else {
-                        callback.operationFinished(ctx, null);
-                    }
-                }
-            }).start();
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testOwnershipChange() throws Exception {
-        SynchronousQueue<Pair<ByteString, Boolean>> bsQueue = new SynchronousQueue<Pair<ByteString,
Boolean>>();
-
-        StubOwnershipChangeListener listener = new StubOwnershipChangeListener(bsQueue);
-
-        tm.addTopicOwnershipChangeListener(listener);
-
-        // regular acquire
-        tm.getOwner(topic, true, addrCbq, null);
-        Pair<ByteString, Boolean> pair = bsQueue.take();
-        Assert.assertEquals(topic, pair.first());
-        Assert.assertTrue(pair.second());
-        Assert.assertEquals(me, check(addrCbq.take()));
-        assertOwnershipNodeExists();
-
-        // topic that I already own
-        tm.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        Assert.assertTrue(bsQueue.isEmpty());
-        assertOwnershipNodeExists();
-
-        // regular release
-        tm.releaseTopic(topic, cb, null);
-        pair = bsQueue.take();
-        Assert.assertEquals(topic, pair.first());
-        Assert.assertFalse(pair.second());
-        Assert.assertTrue(queue.take());
-        assertOwnershipNodeDoesntExist();
-
-        // releasing topic that I don't own
-        tm.releaseTopic(mkTopic(0), cb, null);
-        Assert.assertTrue(queue.take());
-        Assert.assertTrue(bsQueue.isEmpty());
-
-        // set listener to return error
-        listener.setFailure(true);
-
-        tm.getOwner(topic, true, addrCbq, null);
-        pair = bsQueue.take();
-        Assert.assertEquals(topic, pair.first());
-        Assert.assertTrue(pair.second());
-        Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException)
addrCbq.take().right())
-                            .getExceptions().iterator().next().getClass());
-        Assert.assertFalse(null != tm.topics.getIfPresent(topic));
-        Thread.sleep(100);
-        assertOwnershipNodeDoesntExist();
-
-    }
-
-    public void assertOwnershipNodeExists() throws Exception {
-        byte[] data = zk.getData(tm.hubPath(topic), false, null);
-        Assert.assertEquals(HubInfo.parse(new String(data)).getAddress(),
-                            tm.addr);
-    }
-
-    public void assertOwnershipNodeDoesntExist() throws Exception {
-        try {
-            zk.getData(tm.hubPath(topic), false, null);
-            Assert.assertTrue(false);
-        } catch (KeeperException e) {
-            Assert.assertEquals(e.code(), KeeperException.Code.NONODE);
-        }
-    }
-
-    @Test(timeout=60000)
-    public void testZKClientDisconnected() throws Exception {
-        // First assert ownership of the topic
-        tm.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-
-        // Suspend the ZKTopicManager and make sure calls to getOwner error out
-        tm.isSuspended = true;
-        tm.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(PubSubException.ServiceDownException.class, addrCbq.take().right().getClass());
-        // Release the topic. This should not error out even if suspended.
-        tm.releaseTopic(topic, cb, null);
-        Assert.assertTrue(queue.take());
-        assertOwnershipNodeDoesntExist();
-
-        // Restart the ZKTopicManager and make sure calls to getOwner are okay
-        tm.isSuspended = false;
-        tm.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        assertOwnershipNodeExists();
-    }
-
-    @Test(timeout=60000)
-    public void testRetentionAfterAccess() throws Exception {
-        DEFAULT_RETENTION_SECS_AFTER_ACCESS = 5;
-        ZkTopicManager tm1 = new ZkTopicManager(zk, cfg, scheduler);
-        tm1.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        Thread.sleep(6000L);
-        tm1.topics.cleanUp();
-        Thread.sleep(2000L);
-        assertOwnershipNodeDoesntExist();
-        tm1.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        Thread.sleep(1000L);
-        tm1.topics.cleanUp();
-        Thread.sleep(2000L);
-        assertOwnershipNodeExists();
-
-        tm1.stop();
-    }
-
-    @Test(timeout=60000)
-    public void testMaxNumTopics() throws Exception {
-        DEFAULT_MAX_NUM_TOPICS = 1;
-        TopicManager tm1 = new ZkTopicManager(zk, cfg, scheduler);
-        tm1.getOwner(topic, true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        assertOwnershipNodeExists();
-        tm1.getOwner(ByteString.copyFromUtf8("MaxNumTopic"),
-                     true, addrCbq, null);
-        Assert.assertEquals(me, check(addrCbq.take()));
-        Thread.sleep(2000L);
-        assertOwnershipNodeDoesntExist();
-        tm1.stop();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java b/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java
deleted file mode 100644
index e025e76..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.zookeeper;
-
-import java.util.Arrays;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestZkUtils extends ZooKeeperTestBase {
-
-    @Test(timeout=60000)
-    public void testCreateFullPathOptimistic() throws Exception {
-        testPath("/a/b/c", CreateMode.EPHEMERAL);
-
-        testPath("/b/c/d", CreateMode.PERSISTENT);
-
-        testPath("/b/c/d/e", CreateMode.PERSISTENT);
-
-    }
-
-    void testPath(String path, CreateMode mode) throws Exception {
-        byte[] data = new byte[] { 77 };
-        ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, mode, strCb,
null);
-        Assert.assertTrue(queue.take());
-        Assert.assertTrue(Arrays.equals(data, zk.getData(path, false, null)));
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java
b/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java
deleted file mode 100644
index 4213059..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.zookeeper;
-
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
-import org.junit.Before;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Callback;
-import org.apache.bookkeeper.test.PortManager;
-
-/**
- * This is a base class for any tests that need a ZooKeeper client/server setup.
- *
- */
-public abstract class ZooKeeperTestBase extends ClientBase {
-
-    protected ZooKeeper zk;
-
-    protected SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
-
-    protected Callback<Void> cb = new Callback<Void>() {
-
-        @Override
-        public void operationFinished(Object ctx, Void result) {
-            new Thread(new Runnable() {
-                public void run() {
-                    ConcurrencyUtils.put(queue, true);
-                }
-            }).start();
-        }
-
-        @Override
-        public void operationFailed(Object ctx, PubSubException exception) {
-            new Thread(new Runnable() {
-                public void run() {
-                    ConcurrencyUtils.put(queue, false);
-                }
-            }).start();
-        }
-    };
-
-    protected AsyncCallback.StringCallback strCb = new AsyncCallback.StringCallback() {
-        @Override
-        public void processResult(int rc, String path, Object ctx, String name) {
-            ConcurrencyUtils.put(queue, rc == Code.OK.intValue());
-        }
-    };
-
-    protected AsyncCallback.VoidCallback voidCb = new AsyncCallback.VoidCallback() {
-        @Override
-        public void processResult(int rc, String path, Object ctx) {
-            ConcurrencyUtils.put(queue, rc == Code.OK.intValue());
-        }
-    };
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        hostPort = "127.0.0.1:" + PortManager.nextFreePort();
-        super.setUp();
-        zk = createClient();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        super.tearDown();
-        zk.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/resources/log4j.properties b/hedwig-server/src/test/resources/log4j.properties
deleted file mode 100644
index 5983f0b..0000000
--- a/hedwig-server/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,72 +0,0 @@
-#
-# 
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-# 
-#
-
-#
-# Hedwig Logging Configuration
-#
-
-# Format is "<default threshold> (, <appender>)+
-
-# DEFAULT: console appender only
-log4j.rootLogger=INFO, CONSOLE
-
-# Example with rolling log file
-#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
-
-# Example with rolling log file and tracing
-#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
-
-#
-# Log INFO level and above messages to the console
-#
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=INFO
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-#
-# Add ROLLINGFILE to rootLogger to get log file output
-#    Log DEBUG level and above messages to a log file
-log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.ROLLINGFILE.Threshold=DEBUG
-log4j.appender.ROLLINGFILE.File=hedwig-server.log
-log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] -
%m%n
-
-# Max log file size of 10MB
-log4j.appender.ROLLINGFILE.MaxFileSize=10MB
-# uncomment the next line to limit number of backup files
-#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
-
-log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
-
-
-#
-# Add TRACEFILE to rootLogger to get log file output
-#    Log DEBUG level and above messages to a log file
-log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
-log4j.appender.TRACEFILE.Threshold=TRACE
-log4j.appender.TRACEFILE.File=hedwig_trace.log
-
-log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
-### Notice we are including log4j's NDC here (%x)
-log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L][%x] -
%m%n

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index da6867d..3b8b233 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,10 +33,6 @@
   <inceptionYear>2011</inceptionYear>
   <modules>
     <module>compat-deps</module>
-    <module>hedwig-client</module>
-    <module>hedwig-server</module>
-    <module>hedwig-protocol</module>
-    <module>hedwig-client-jms</module>
     <module>bookkeeper-stats</module>
     <module>bookkeeper-server</module>
     <module>bookkeeper-benchmark</module>
@@ -87,17 +83,12 @@
         <artifactId>maven-javadoc-plugin</artifactId>
         <version>2.8</version>
         <configuration>
-          <additionalparam>-exclude org.apache.hedwig.client.netty:org.apache.hedwig.client.benchmark:org.apache.hedwig.client.data:org.apache.hedwig.client.exceptions:org.apache.hedwig.client.handlers:org.apache.hedwig.client.ssl</additionalparam>
-          <subpackages>org.apache.bookkeeper.client:org.apache.bookkeeper.conf:org.apache.hedwig.client:org.apache.hedwig.util:org.apache.hedwig.protocol:org.apache.hedwig.exceptions</subpackages>
+          <subpackages>org.apache.bookkeeper.client:org.apache.bookkeeper.conf</subpackages>
           <groups>
             <group>
               <title>Bookkeeper</title>
               <packages>org.apache.bookkeeper*</packages>
             </group>
-            <group>
-              <title>Hedwig</title>
-              <packages>org.apache.hedwig*</packages>
-            </group>
           </groups>
         </configuration>
         <executions>


Mime
View raw message