Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8DE02CD56 for ; Thu, 17 May 2012 16:32:33 +0000 (UTC) Received: (qmail 80701 invoked by uid 500); 17 May 2012 16:32:33 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 80677 invoked by uid 500); 17 May 2012 16:32:33 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 80664 invoked by uid 99); 17 May 2012 16:32:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 May 2012 16:32:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 May 2012 16:32:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B19D82388962 for ; Thu, 17 May 2012 16:32:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1339691 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/util/ hedwig-protocol... Date: Thu, 17 May 2012 16:32:06 -0000 To: commits@zookeeper.apache.org From: ivank@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120517163208.B19D82388962@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ivank Date: Thu May 17 16:32:05 2012 New Revision: 1339691 URL: http://svn.apache.org/viewvc?rev=1339691&view=rev Log: BOOKKEEPER-72: Fix warnings issued by FindBugs (ivank) Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java zookeeper/bookkeeper/trunk/hedwig-server/pom.xml zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu May 17 16:32:05 2012 @@ -14,6 +14,8 @@ Trunk (unreleased changes) BOOKKEEPER-254: Bump zookeeper version in poms (ivank) + BOOKKEEPER-72: Fix warnings issued by FindBugs (ivank) + bookkeeper-server/ BOOKKEEPER-142: Parsing last log id is wrong, which may make entry log files overwritten (Sijie Gou via ivank) Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java Thu May 17 16:32:05 2012 @@ -61,6 +61,8 @@ public class BenchmarkPublisher extends // picking constants arbitarily for warmup phase ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", nWarmup, 100); + agg.startProgress(); + Message msg = getMsg(1024); for (int i = 0; i < nWarmup; i++) { publisher.asyncPublish(topic, msg, new BenchmarkCallback(agg), null); @@ -100,6 +102,7 @@ public class BenchmarkPublisher extends int myPublishLimit = numMessages / numRegions / numPartitions - myPublishCount; myPublishCount = 0; ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", myPublishLimit, nParallel); + agg.startProgress(); int topicLabel = 0; Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java Thu May 17 16:32:05 2012 @@ -59,6 +59,8 @@ public class BenchmarkSubscriber extends public Void call() throws Exception { final ThroughputAggregator agg = new ThroughputAggregator("recvs", numMessages); + agg.startProgress(); + final Map lastSeqIdSeenMap = new HashMap(); for (int i = startTopicLabel; i < startTopicLabel + numTopics; i++) { @@ -120,6 +122,8 @@ public class BenchmarkSubscriber extends throws InterruptedException { long startTime = System.currentTimeMillis(); ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator(label, count / numPartitions, npar); + agg.startProgress(); + int end = start + count; for (int i = start; i < end; ++i) { if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)) { Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java Thu May 17 16:32:05 2012 @@ -60,6 +60,10 @@ public class BenchmarkUtils { } } + public void startProgress() { + tpAgg.startProgress(); + } + public void reportLatency(long latency) { sum.addAndGet(latency); @@ -101,6 +105,7 @@ public class BenchmarkUtils { final AtomicInteger done = new AtomicInteger(); final AtomicLong earliest = new AtomicLong(); final AtomicInteger numFailed = new AtomicInteger(); + final Thread progressThread; final LinkedBlockingQueue queue = new LinkedBlockingQueue(); public ThroughputAggregator(final String label, final int count) { @@ -109,7 +114,7 @@ public class BenchmarkUtils { if (count == 0) queue.add(0); if (Boolean.getBoolean("progress")) { - new Thread(new Runnable() { + progressThread = new Thread(new Runnable() { @Override public void run() { try { @@ -123,7 +128,15 @@ public class BenchmarkUtils { throw new RuntimeException(ex); } } - }).start(); + }); + } else { + progressThread = null; + } + } + + public void startProgress() { + if (progressThread != null) { + progressThread.start(); } } Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Thu May 17 16:32:05 2012 @@ -78,7 +78,7 @@ public class SubscribeResponseHandler { // Public getter to retrieve the original PubSubData used for the Subscribe // request. - public PubSubData getOrigSubData() { + synchronized public PubSubData getOrigSubData() { return origSubData; } @@ -98,35 +98,37 @@ public class SubscribeResponseHandler { + HedwigClientImpl.getHostFromChannel(channel)); switch (response.getStatusCode()) { case SUCCESS: - // For successful Subscribe requests, store this Channel locally - // and set it to not be readable initially. - // This way we won't be delivering messages for this topic - // subscription until the client explicitly says so. - subscribeChannel = channel; - subscribeChannel.setReadable(false); - // Store the original PubSubData used to create this successful - // Subscribe request. - origSubData = pubSubData; - // Store the mapping for the TopicSubscriber to the Channel. - // This is so we can control the starting and stopping of - // message deliveries from the server on that Channel. Store - // this only on a successful ack response from the server. - TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId); - responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel); - // Lazily create the Set (from a concurrent hashmap) to keep track - // of outstanding Messages to be consumed by the client app. At this - // stage, delivery for that topic hasn't started yet so creation of - // this Set should be thread safe. We'll create the Set with an initial - // capacity equal to the configured parameter for the maximum number of - // outstanding messages to allow. The load factor will be set to - // 1.0f which means we'll only rehash and allocate more space if - // we ever exceed the initial capacity. That should be okay - // because when that happens, things are slow already and piling - // up on the client app side to consume messages. - - outstandingMsgSet = Collections.newSetFromMap(new ConcurrentHashMap( - responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f)); - + synchronized(this) { + // For successful Subscribe requests, store this Channel locally + // and set it to not be readable initially. + // This way we won't be delivering messages for this topic + // subscription until the client explicitly says so. + subscribeChannel = channel; + subscribeChannel.setReadable(false); + // Store the original PubSubData used to create this successful + // Subscribe request. + origSubData = pubSubData; + + // Store the mapping for the TopicSubscriber to the Channel. + // This is so we can control the starting and stopping of + // message deliveries from the server on that Channel. Store + // this only on a successful ack response from the server. + TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId); + responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel); + // Lazily create the Set (from a concurrent hashmap) to keep track + // of outstanding Messages to be consumed by the client app. At this + // stage, delivery for that topic hasn't started yet so creation of + // this Set should be thread safe. We'll create the Set with an initial + // capacity equal to the configured parameter for the maximum number of + // outstanding messages to allow. The load factor will be set to + // 1.0f which means we'll only rehash and allocate more space if + // we ever exceed the initial capacity. That should be okay + // because when that happens, things are slow already and piling + // up on the client app side to consume messages. + outstandingMsgSet = Collections.newSetFromMap( + new ConcurrentHashMap( + responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f)); + } // Response was success so invoke the callback's operationFinished // method. pubSubData.callback.operationFinished(pubSubData.context, null); @@ -162,9 +164,11 @@ public class SubscribeResponseHandler { // Main method to handle consuming a message for a topic that the client is // subscribed to. public void handleSubscribeMessage(PubSubResponse response) { - if (logger.isDebugEnabled()) - logger.debug("Handling a Subscribe message in response: " + response + ", topic: " - + origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8()); + if (logger.isDebugEnabled()) { + logger.debug("Handling a Subscribe message in response: {}, topic: {}, subscriberId: {}", + new Object[] { response, getOrigSubData().topic.toStringUtf8(), + getOrigSubData().subscriberId.toStringUtf8() }); + } Message message = response.getMessage(); synchronized (this) { @@ -300,9 +304,11 @@ public class SubscribeResponseHandler { * MessageHandler to register for this ResponseHandler instance. */ public void setMessageHandler(MessageHandler messageHandler) { - if (logger.isDebugEnabled()) - logger.debug("Setting the messageHandler for topic: " + origSubData.topic.toStringUtf8() - + ", subscriberId: " + origSubData.subscriberId.toStringUtf8()); + if (logger.isDebugEnabled()) { + logger.debug("Setting the messageHandler for topic: {}, subscriberId: {}", + getOrigSubData().topic.toStringUtf8(), + getOrigSubData().subscriberId.toStringUtf8()); + } synchronized (this) { this.messageHandler = messageHandler; // Once the MessageHandler is registered, see if we have any queued up Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java Thu May 17 16:32:05 2012 @@ -124,6 +124,11 @@ public class HedwigSocketAddress { return (this.hostname.equals(that.hostname) && (this.port == that.port) && (this.sslPort == that.sslPort)); } + @Override + public int hashCode() { + return (this.hostname + this.port + this.sslPort).hashCode(); + } + // Static helper method to return the string representation for an // InetSocketAddress. The HedwigClient can only operate in SSL or non-SSL // mode. So the server hosts it connects to will just be an Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java Thu May 17 16:32:05 2012 @@ -26,12 +26,12 @@ public class PathUtils { /** Generate all prefixes for a path. "/a/b/c" -> ["/a","/a/b","/a/b/c"] */ public static List prefixes(String path) { List prefixes = new ArrayList(); - String prefix = ""; + StringBuilder prefix = new StringBuilder(); for (String comp : path.split("/+")) { // Skip the first (empty) path component. if (!comp.equals("")) { - prefix += "/" + comp; - prefixes.add(prefix); + prefix.append("/").append(comp); + prefixes.add(prefix.toString()); } } return prefixes; Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java Thu May 17 16:32:05 2012 @@ -27,7 +27,7 @@ public class PubSubResponseUtils { /** * Change here if bumping up the version number that the server sends back */ - protected static ProtocolVersion serverVersion = ProtocolVersion.VERSION_ONE; + protected final static ProtocolVersion serverVersion = ProtocolVersion.VERSION_ONE; static PubSubResponse.Builder getBasicBuilder(StatusCode status) { return PubSubResponse.newBuilder().setProtocolVersion(serverVersion).setStatusCode(status); Modified: zookeeper/bookkeeper/trunk/hedwig-server/pom.xml URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/pom.xml?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/pom.xml (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/pom.xml Thu May 17 16:32:05 2012 @@ -137,31 +137,13 @@ - maven-antrun-plugin Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java Thu May 17 16:32:05 2012 @@ -18,6 +18,7 @@ package org.apache.hedwig.admin; +import java.util.Arrays; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -65,7 +66,7 @@ public class HedwigAdmin { protected ClientConfiguration bkClientConf; // Empty watcher - private class MyWatcher implements Watcher { + private static class MyWatcher implements Watcher { public void process(WatchedEvent event) { } } @@ -140,7 +141,7 @@ public class HedwigAdmin { * @return bookeeper passwd */ public byte[] getBkPasswd() { - return passwd; + return Arrays.copyOf(passwd, passwd.length); } /** @@ -187,8 +188,10 @@ public class HedwigAdmin { if (data != null) { load = Integer.parseInt(new String(data)); } - } catch (Exception e) { - // igore + } catch (KeeperException ke) { + LOG.warn("Couldn't read hub data from ZooKeeper", ke); + } catch (InterruptedException ie) { + LOG.warn("Interrupted during read", ie); } hubs.put(new HedwigSocketAddress(host), load); } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java Thu May 17 16:32:05 2012 @@ -336,8 +336,6 @@ public final class HedwigCommands { public String getDescription() { return desc; } - public String[] getUsage() { return usage; } - public Map getSubCommands() { return subCmds; } public void addSubCommand(COMMAND c) { @@ -364,7 +362,10 @@ public final class HedwigCommands { commands.put(c.getName(), c); } - static { + static synchronized void init() { + if (commands != null) { + return; + } commands = new LinkedHashMap(); addCommand(COMMAND.CMD_PUB); Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Thu May 17 16:32:05 2012 @@ -34,7 +34,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.ConfigurationException; import org.apache.hedwig.admin.HedwigAdmin; @@ -100,7 +101,7 @@ public class HedwigConsole { boolean runCmd(String[] args) throws Exception; } - class HelpCmd implements MyCommand { + static class HelpCmd implements MyCommand { @Override public boolean runCmd(String[] args) throws Exception { @@ -127,7 +128,7 @@ public class HedwigConsole { printMessage("Quitting ..."); hubClient.close(); admin.close(); - System.exit(0); + Runtime.getRuntime().exit(0); return true; } } @@ -223,7 +224,7 @@ public class HedwigConsole { } - class ConsoleMessageHandler implements MessageHandler { + static class ConsoleMessageHandler implements MessageHandler { @Override public void deliver(ByteString topic, ByteString subscriberId, @@ -444,7 +445,7 @@ public class HedwigConsole { boolean subscribed = false; boolean success = false; - final AtomicBoolean isDone = new AtomicBoolean(false); + final CountDownLatch isDone = new CountDownLatch(1); long elapsedTime = 0L; System.out.println("Starting PUBSUB test ..."); @@ -471,10 +472,7 @@ public class HedwigConsole { if (thisTopic.equals(topic) && subscriberId.equals(subId) && msg.getBody().equals(message.getBody())) { System.out.println("Received message : " + message.getBody().toStringUtf8()); - synchronized(isDone) { - isDone.set(true); - isDone.notify(); - } + isDone.countDown(); } callback.operationFinished(context, null); } @@ -482,10 +480,7 @@ public class HedwigConsole { }); // wait for the message - synchronized (isDone) { - isDone.wait(timeoutSecs * 1000L); - } - success = isDone.get(); + success = isDone.await(timeoutSecs, TimeUnit.SECONDS); elapsedTime = System.currentTimeMillis() - startTime; } finally { try { @@ -783,6 +778,7 @@ public class HedwigConsole { * @throws InterruptedException */ public HedwigConsole(String[] args) throws IOException, InterruptedException { + HedwigCommands.init(); cl.parseOptions(args); if (cl.getCommand() == null) { @@ -970,10 +966,26 @@ public class HedwigConsole { historyEnabled = true; System.out.println("JLine history support is enabled"); - } catch (Exception e) { + } catch (ClassNotFoundException e) { + System.out.println("JLine history support is disabled"); + LOG.debug("JLine history disabled with exception", e); historyEnabled = false; - e.printStackTrace(); + } catch (NoSuchMethodException e) { + System.out.println("JLine history support is disabled"); + LOG.debug("JLine history disabled with exception", e); + historyEnabled = false; + } catch (InvocationTargetException e) { System.out.println("JLine history support is disabled"); + LOG.debug("JLine history disabled with exception", e); + historyEnabled = false; + } catch (IllegalAccessException e) { + System.out.println("JLine history support is disabled"); + LOG.debug("JLine history disabled with exception", e); + historyEnabled = false; + } catch (InstantiationException e) { + System.out.println("JLine history support is disabled"); + LOG.debug("JLine history disabled with exception", e); + historyEnabled = false; } String line; Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java Thu May 17 16:32:05 2012 @@ -72,7 +72,6 @@ public class ReadTopic { static final int NUM_MESSAGES_TO_PRINT = 15; SortedMap ledgers = new TreeMap(); - SubscriptionState leastSubscriber = null; static class InMemoryLedgerRange { LedgerRange range; @@ -152,7 +151,6 @@ public class ReadTopic { long localMsgId = state.getMsgId().getLocalComponent(); if (localMsgId < leastConsumedSeqId) { leastConsumedSeqId = localMsgId; - this.leastSubscriber = state; } } if (leastConsumedSeqId == Long.MAX_VALUE) { Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java Thu May 17 16:32:05 2012 @@ -26,7 +26,7 @@ public class TerminateJVMExceptionHandle @Override public void uncaughtException(Thread t, Throwable e) { logger.error("Uncaught exception in thread " + t.getName(), e); - System.exit(1); + Runtime.getRuntime().exit(1); } } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java Thu May 17 16:32:05 2012 @@ -78,4 +78,8 @@ public class ChannelEndPoint implements } } + @Override + public int hashCode() { + return channel.hashCode(); + } } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Thu May 17 16:32:05 2012 @@ -22,6 +22,8 @@ import org.apache.hedwig.protocol.PubSub import org.apache.hedwig.server.subscriptions.MessageFilter; public interface DeliveryManager { + public void start(); + public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom, DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber); Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Thu May 17 16:32:05 2012 @@ -89,15 +89,20 @@ public class FIFODeliveryManager impleme // Boolean indicating if this thread should continue running. This is used // when we want to stop the thread during a PubSubServer shutdown. protected boolean keepRunning = true; + private final Thread workerThread; public FIFODeliveryManager(PersistenceManager persistenceMgr, ServerConfiguration cfg) { this.persistenceMgr = persistenceMgr; perTopicDeliveryPtrs = new HashMap>>(); subscriberStates = new HashMap(); - new Thread(this, "DeliveryManagerThread").start(); + workerThread = new Thread(this, "DeliveryManagerThread"); this.cfg = cfg; } + public void start() { + workerThread.start(); + } + /** * ===================================================================== Our * usual enqueue function, stop if error because of unbounded queue, should @@ -296,7 +301,7 @@ public class FIFODeliveryManager impleme long localSeqIdDeliveringNow; long lastSeqIdCommunicatedExternally; // TODO make use of these variables - MessageFilter filter; + boolean isHubSubscriber; final static int SEQ_ID_SLACK = 10; @@ -306,7 +311,7 @@ public class FIFODeliveryManager impleme this.subscriberId = subscriberId; this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered; this.deliveryEndPoint = deliveryEndPoint; - this.filter = filter; + this.isHubSubscriber = isHubSubscriber; } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java Thu May 17 16:32:05 2012 @@ -30,7 +30,7 @@ public class NettyHandlerBean implements public NettyHandlerBean(Map handlers) { this.handlers = handlers; - subHandler = (SubscribeHandler) handlers.get(OperationType.SUBSCRIBE); + subHandler = (SubscribeHandler) this.handlers.get(OperationType.SUBSCRIBE); } @Override Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Thu May 17 16:32:05 2012 @@ -107,6 +107,7 @@ public class PubSubServer { // JMX Beans NettyHandlerBean jmxNettyBean; PubSubServerBean jmxServerBean; + final ThreadGroup tg; protected PersistenceManager instantiatePersistenceManager(TopicManager topicMgr) throws IOException, InterruptedException { @@ -311,8 +312,9 @@ public class PubSubServer { * @throws InterruptedException * @throws ConfigurationException */ - public PubSubServer(final ServerConfiguration conf, final Thread.UncaughtExceptionHandler exceptionHandler) - throws Exception { + public PubSubServer(final ServerConfiguration conf, + final Thread.UncaughtExceptionHandler exceptionHandler) + throws ConfigurationException { // First validate the conf this.conf = conf; @@ -320,7 +322,7 @@ public class PubSubServer { // We need a custom thread group, so that we can override the uncaught // exception method - ThreadGroup tg = new ThreadGroup("hedwig") { + tg = new ThreadGroup("hedwig") { @Override public void uncaughtException(Thread t, Throwable e) { exceptionHandler.uncaughtException(t, e); @@ -330,7 +332,9 @@ public class PubSubServer { // we do in ZK threads throws an exception, we want our handler to be // called, not theirs. SafeAsyncCallback.setUncaughtExceptionHandler(exceptionHandler); + } + public void start() throws Exception { final SynchronousQueue> queue = new SynchronousQueue>(); new Thread(tg, new Runnable() { @@ -349,6 +353,8 @@ public class PubSubServer { tm = instantiateTopicManager(); pm = instantiatePersistenceManager(tm); dm = new FIFODeliveryManager(pm, conf); + dm.start(); + sm = instantiateSubscriptionManager(tm, pm); rm = instantiateRegionManager(pm, scheduler); sm.addListener(rm); @@ -422,7 +428,7 @@ public class PubSubServer { logger.info("Using configuration file " + confFile); } try { - new PubSubServer(conf); + new PubSubServer(conf).start(); } catch (Throwable t) { errorMsgAndExit("Error during startup", t, RC_OTHER); } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java Thu May 17 16:32:05 2012 @@ -121,7 +121,7 @@ public class ServerStats { ++latencyBuckets[bucket]; } - public OpStatData toOpStatData() { + synchronized public OpStatData toOpStatData() { double avgLatency = numSuccessOps > 0 ? totalLatency / numSuccessOps : 0.0f; StringBuilder sb = new StringBuilder(); for (int i=0; i threadLocalDigest = new ThreadLocal() { + @Override + protected MessageDigest initialValue() { + try { + return MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + logger.error("Could not find MD5 hash", e); + return null; + } + } + }; static final String ID_FIELD_NAME = "id"; static final String MSG_FIELD_NAME = "msg"; static final String driver = "org.apache.derby.jdbc.EmbeddedDriver"; @@ -246,14 +262,22 @@ public class LocalDBPersistenceManager i * sneak in and create the table before us */ private void createTable(Connection conn, ByteString topic) { - + Statement stmt = null; try { - Statement stmt = conn.createStatement(); + stmt = conn.createStatement(); String tableName = getTableNameForTopic(topic); stmt.execute("CREATE TABLE " + tableName + " (" + ID_FIELD_NAME + " BIGINT NOT NULL CONSTRAINT ID_PK_" - + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)"); + + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)"); } catch (SQLException e) { logger.debug("Could not create table", e); + } finally { + try { + if (stmt != null) { + stmt.close(); + } + } catch (SQLException e) { + logger.error("Error closing statement", e); + } } } @@ -274,7 +298,11 @@ public class LocalDBPersistenceManager i } private String getTableNameForTopic(ByteString topic) { - return (topic.toStringUtf8() + "_" + version); + String src = (topic.toStringUtf8() + "_" + version); + threadLocalDigest.get().reset(); + byte[] digest = threadLocalDigest.get().digest(src.getBytes()); + BigInteger bigInt = new BigInteger(1,digest); + return String.format("TABLE_%032X", bigInt); } private void scanMessagesInternal(ByteString topic, long startSeqId, int messageLimit, long sizeLimit, @@ -290,7 +318,7 @@ public class LocalDBPersistenceManager i long currentSeqId; currentSeqId = startSeqId; - PreparedStatement stmt; + PreparedStatement stmt = null; try { try { stmt = conn.prepareStatement("SELECT * FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME @@ -367,8 +395,15 @@ public class LocalDBPersistenceManager i logger.error("Message stored in derby is not parseable", e); callback.scanFailed(ctx, new ServiceDownException(e)); return; + } finally { + try { + if (stmt != null) { + stmt.close(); + } + } catch (SQLException e) { + logger.error("Error closing statement", e); + } } - } public void deliveredUntil(ByteString topic, Long seqId) { @@ -381,20 +416,27 @@ public class LocalDBPersistenceManager i logger.error("Not connected to derby"); return; } - PreparedStatement stmt; + PreparedStatement stmt = null; try { stmt = conn.prepareStatement("DELETE FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME + " <= ?"); stmt.setLong(1, seqId); int rowCount = stmt.executeUpdate(); logger.debug("Deleted " + rowCount + " records for topic: " + topic.toStringUtf8() + ", seqId: " + seqId); - stmt.close(); } catch (SQLException sqle) { String theError = (sqle).getSQLState(); if (theError.equals("42X05")) { logger.warn("Table for topic (" + topic + ") does not exist so no consumed messages to delete!"); } else logger.error("Error while executing derby delete for consumed messages", sqle); + } finally { + try { + if (stmt != null) { + stmt.close(); + } + } catch (SQLException e) { + logger.error("Error closing statement", e); + } } } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Thu May 17 16:32:05 2012 @@ -458,7 +458,7 @@ public class ReadAheadCache implements P } protected static class HashSetCacheKeyFactory implements Factory> { - protected static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory(); + protected final static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory(); public Set newInstance() { return new HashSet(); @@ -466,7 +466,7 @@ public class ReadAheadCache implements P } protected static class TreeSetLongFactory implements Factory> { - protected static TreeSetLongFactory instance = new TreeSetLongFactory(); + protected final static TreeSetLongFactory instance = new TreeSetLongFactory(); public SortedSet newInstance() { return new TreeSet(); Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Thu May 17 16:32:05 2012 @@ -53,18 +53,24 @@ public class HedwigProxy { Map handlers; ProxyConfiguration cfg; ChannelTracker tracker; + ThreadGroup tg; - public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler) - throws InterruptedException { + public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler) { this.cfg = cfg; - ThreadGroup tg = new ThreadGroup("hedwigproxy") { + tg = new ThreadGroup("hedwigproxy") { @Override public void uncaughtException(Thread t, Throwable e) { exceptionHandler.uncaughtException(t, e); } }; + } + + public HedwigProxy(ProxyConfiguration conf) throws InterruptedException { + this(conf, new TerminateJVMExceptionHandler()); + } + public void start() throws InterruptedException { final LinkedBlockingQueue queue = new LinkedBlockingQueue(); new Thread(tg, new Runnable() { @@ -84,10 +90,6 @@ public class HedwigProxy { queue.take(); } - public HedwigProxy(ProxyConfiguration conf) throws InterruptedException { - this(conf, new TerminateJVMExceptionHandler()); - } - // used for testing public ChannelTracker getChannelTracker() { return tracker; @@ -161,7 +163,7 @@ public class HedwigProxy { logger.info("Using configuration file " + confFile); } try { - new HedwigProxy(conf); + new HedwigProxy(conf).start(); } catch (Throwable t) { PubSubServer.errorMsgAndExit("Error during startup", t, PubSubServer.RC_OTHER); } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java Thu May 17 16:32:05 2012 @@ -21,8 +21,8 @@ import org.apache.hedwig.client.conf.Cli public class ProxyConfiguration extends ClientConfiguration { - protected static String PROXY_PORT = "proxy_port"; - protected static String MAX_MESSAGE_SIZE = "max_message_size"; + protected final static String PROXY_PORT = "proxy_port"; + protected final static String MAX_MESSAGE_SIZE = "max_message_size"; public int getProxyPort() { return conf.getInt(PROXY_PORT, 9099); Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Thu May 17 16:32:05 2012 @@ -26,7 +26,7 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,7 +93,7 @@ public class RegionManager implements Su if (null == topics || topics.isEmpty()) { continue; } - final AtomicBoolean done = new AtomicBoolean(false); + final CountDownLatch done = new CountDownLatch(1); Callback postCb = new Callback() { @Override public void operationFinished(Object ctx, @@ -106,10 +106,7 @@ public class RegionManager implements Su finish(); } void finish() { - synchronized (done) { - done.set(true); - done.notifyAll(); - } + done.countDown(); } }; Callback mcb = CallbackUtils.multiCallback(topics.size(), postCb, null); @@ -122,14 +119,10 @@ public class RegionManager implements Su } retrySubscribe(client, topic, mcb); } - synchronized (done) { - if (done.get()) { - try { - done.wait(); - } catch (InterruptedException e) { - LOGGER.warn("Exception during retrying remote subscriptions : ", e); - } - } + try { + done.await(); + } catch (InterruptedException e) { + LOGGER.warn("Exception during retrying remote subscriptions : ", e); } } } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Thu May 17 16:32:05 2012 @@ -71,7 +71,7 @@ public abstract class AbstractSubscripti Callback noopCallback = new NoopCallback(); - class NoopCallback implements Callback { + static class NoopCallback implements Callback { @Override public void operationFailed(Object ctx, PubSubException exception) { logger.warn("Exception found in AbstractSubscriptionManager : ", exception); @@ -110,6 +110,10 @@ public abstract class AbstractSubscripti // so it should be safe to run this fairly often. for (ByteString topic : top2sub2seq.keySet()) { final Map topicSubscriptions = top2sub2seq.get(topic); + if (topicSubscriptions == null) { + continue; + } + long minConsumedMessage = Long.MAX_VALUE; boolean hasBound = true; // Loop through all subscribers to the current topic to find the @@ -126,9 +130,9 @@ public abstract class AbstractSubscripti // Don't call the PersistenceManager if nobody is subscribed to // the topic yet, or the consume pointer has not changed since // the last time, or if this is the initial subscription. + Long minConsumedFromMap = topic2MinConsumedMessagesMap.get(topic); if (topicSubscriptions.isEmpty() - || (topic2MinConsumedMessagesMap.containsKey(topic) - && topic2MinConsumedMessagesMap.get(topic) == minConsumedMessage) + || (minConsumedFromMap != null && minConsumedFromMap.equals(minConsumedMessage)) || minConsumedMessage == 0) { topic2MinConsumedMessagesMap.put(topic, minConsumedMessage); pm.consumedUntil(topic, minConsumedMessage); @@ -404,7 +408,8 @@ public abstract class AbstractSubscripti // so the following codes only happened when remote subscription failed. // it is safe to decrement the local count so next subscribe op // could have the chance to subscribe remote. - topic2LocalCounts.get(topic).decrementAndGet(); + AtomicInteger count = topic2LocalCounts.get(topic); + if (count != null) { count.decrementAndGet(); } } cb.operationFailed(ctx, exception); } @@ -422,8 +427,10 @@ public abstract class AbstractSubscripti }; + AtomicInteger count = topic2LocalCounts.get(topic); if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId()) - && topic2LocalCounts.get(topic).incrementAndGet() == 1) + && count != null + && count.incrementAndGet() == 1) notifySubscribe(topic, subRequest.getSynchronous(), cb2, ctx); else cb2.operationFinished(ctx, resultOfOperation); @@ -537,8 +544,9 @@ public abstract class AbstractSubscripti public void operationFinished(Object ctx, Void resultOfOperation) { topicSubscriptions.remove(subscriberId); // Notify listeners if necessary. + AtomicInteger count = topic2LocalCounts.get(topic); if (!SubscriptionStateUtils.isHubSubscriber(subscriberId) - && topic2LocalCounts.get(topic).decrementAndGet() == 0) + && count != null && count.decrementAndGet() == 0) notifyUnsubcribe(topic); updateMessageBound(topic); Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java Thu May 17 16:32:05 2012 @@ -20,7 +20,7 @@ package org.apache.hedwig.server.subscri import org.apache.hedwig.protocol.PubSubProtocol.Message; public class TrueFilter implements MessageFilter { - protected static TrueFilter instance = new TrueFilter(); + protected final static TrueFilter instance = new TrueFilter(); public static TrueFilter instance() { return instance; Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Thu May 17 16:32:05 2012 @@ -103,7 +103,7 @@ public class ZkTopicManager extends Abst // Check for expired connection. if (event.getState().equals(Watcher.Event.KeeperState.Expired)) { logger.error("ZK client connection to the ZK server has expired!"); - System.exit(1); + Runtime.getRuntime().exit(1); } } }); Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java Thu May 17 16:32:05 2012 @@ -22,7 +22,7 @@ import java.lang.Thread.UncaughtExceptio import org.apache.hedwig.server.common.TerminateJVMExceptionHandler; public class SafeAsyncCallback { - protected static UncaughtExceptionHandler uncaughtExceptionHandler = new TerminateJVMExceptionHandler(); + static UncaughtExceptionHandler uncaughtExceptionHandler = new TerminateJVMExceptionHandler(); public static void setUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) { SafeAsyncCallback.uncaughtExceptionHandler = uncaughtExceptionHandler; Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml?rev=1339691&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml (added) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml Thu May 17 16:32:05 2012 @@ -0,0 +1,25 @@ + + + + + + + + + Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java Thu May 17 16:32:05 2012 @@ -118,7 +118,10 @@ public abstract class HedwigHubTestBase // Now create the PubSubServer Hubs serversList = new LinkedList(); for (int i = 0; i < numServers; i++) { - serversList.add(new PubSubServer(getServerConfiguration(initialServerPort + i, initialSSLServerPort + i))); + PubSubServer s = new PubSubServer( + getServerConfiguration(initialServerPort + i, initialSSLServerPort + i)); + serversList.add(s); + s.start(); } } protected void stopHubServers() throws Exception { Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java Thu May 17 16:32:05 2012 @@ -232,9 +232,13 @@ public abstract class HedwigRegionTestBa // servers. We will basically increment through the port numbers // starting from the initial ones defined. for (int j = 0; j < numServersPerRegion; j++) { - serversList.add(new PubSubServer(getServerConfiguration(initialServerPort - + (j + i * numServersPerRegion), initialSSLServerPort + (j + i * numServersPerRegion), - regionName))); + PubSubServer s = new PubSubServer( + getServerConfiguration(initialServerPort + + (j + i * numServersPerRegion), + initialSSLServerPort + (j + i * numServersPerRegion), + regionName)); + serversList.add(s); + s.start(); } // Store this list of servers created for the current region regionServersMap.put(regionName, serversList); Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java Thu May 17 16:32:05 2012 @@ -48,6 +48,7 @@ public abstract class PubSubServerStandA public void setUp() throws Exception { logger.info("STARTING " + getName()); server = new PubSubServer(new StandAloneServerConfiguration()); + server.start(); logger.info("Standalone PubSubServer test setup finished"); } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java Thu May 17 16:32:05 2012 @@ -96,6 +96,7 @@ public class TestPubSubServerStartup { try { logger.info("starting hedwig broker!"); hedwigServer = new PubSubServer(serverConf); + hedwigServer.start(); } catch (Exception e) { e.printStackTrace(); } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Thu May 17 16:32:05 2012 @@ -64,6 +64,10 @@ public class StubDeliveryManager impleme } @Override + public void start() { + } + + @Override public void stop() { // do nothing now } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Thu May 17 16:32:05 2012 @@ -212,6 +212,7 @@ public class TestHedwigHub extends Hedwi super.setUp(); if (mode == Mode.PROXY) { proxy = new HedwigProxy(proxyConf); + proxy.start(); } client = new HedwigClient(getClientConfiguration()); publisher = client.getPublisher(); Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java?rev=1339691&r1=1339690&r2=1339691&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java Thu May 17 16:32:05 2012 @@ -55,6 +55,7 @@ public class TestPubSubServer extends Pu return super.getServerPort() + 1; } }); + server1.start(); server1.shutdown(); } @@ -91,7 +92,7 @@ public class TestPubSubServer extends Pu return instantiator.instantiateTopicManager(); } }; - + server.start(); return server; }