Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 572F8192E6 for ; Thu, 31 Mar 2016 02:30:57 +0000 (UTC) Received: (qmail 98488 invoked by uid 500); 31 Mar 2016 02:30:56 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 98208 invoked by uid 500); 31 Mar 2016 02:30:56 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 93993 invoked by uid 99); 31 Mar 2016 02:30:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Mar 2016 02:30:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C0759E97D8; Thu, 31 Mar 2016 02:30:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 31 Mar 2016 02:31:33 -0000 Message-Id: <0903849d36ae4c4c9865166dc6cff923@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [51/69] [abbrv] activemq-artemis git commit: cleanup on consumer cleanup on consumer Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dc03f8f7 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dc03f8f7 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dc03f8f7 Branch: refs/heads/refactor-openwire Commit: dc03f8f7795517c872d1eedc27a9e8a47ed6dba3 Parents: 6868127 Author: Clebert Suconic Authored: Thu Mar 17 19:28:38 2016 -0400 Committer: Clebert Suconic Committed: Wed Mar 30 22:29:44 2016 -0400 ---------------------------------------------------------------------- .../plug/ProtonSessionIntegrationCallback.java | 5 ++ .../core/protocol/mqtt/MQTTSessionCallback.java | 5 ++ .../protocol/openwire/OpenWireConnection.java | 1 - .../core/protocol/openwire/amq/AMQConsumer.java | 12 +-- .../openwire/amq/AMQConsumerBrokerExchange.java | 9 +- .../openwire/amq/AMQServerConsumer.java | 90 -------------------- .../protocol/openwire/amq/AMQServerSession.java | 2 +- .../core/protocol/openwire/amq/AMQSession.java | 8 ++ .../protocol/openwire/amq/BrowserListener.java | 22 ----- .../core/protocol/stomp/StompSession.java | 5 ++ .../protocol/core/impl/CoreSessionCallback.java | 4 + .../core/server/impl/ServerConsumerImpl.java | 1 + .../spi/core/protocol/SessionCallback.java | 3 + .../integration/client/HangConsumerTest.java | 5 ++ 14 files changed, 41 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 12aad22..5d6af2a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -92,6 +92,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override + public void browserFinished(ServerConsumer consumer) { + + } + + @Override public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception { this.protonSession = protonSession; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 356dc73..28d86b8 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -83,6 +83,11 @@ public class MQTTSessionCallback implements SessionCallback { @Override + public void browserFinished(ServerConsumer consumer) { + + } + + @Override public boolean hasCredits(ServerConsumer consumerID) { return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index b872e63..17f26b0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -438,7 +438,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap); } synchronized (consumerExchanges) { - result.setConnectionContext(context); consumerExchanges.put(id, result); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 8e7ff49..d296213 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -46,7 +46,7 @@ import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.TransactionId; import org.apache.activemq.wireformat.WireFormat; -public class AMQConsumer implements BrowserListener { +public class AMQConsumer { private AMQSession session; private org.apache.activemq.command.ActiveMQDestination actualDest; @@ -137,7 +137,8 @@ public class AMQConsumer implements BrowserListener { } else { SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName()); - coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); + AMQServerConsumer serverConsumer = (AMQServerConsumer)coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); + serverConsumer.setAmqConsumer(this); AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString()); if (addrSettings != null) { //see PolicyEntry @@ -150,12 +151,6 @@ public class AMQConsumer implements BrowserListener { } } } - - if (info.isBrowser()) { - AMQServerConsumer coreConsumer = coreSession.getConsumer(nativeId); - coreConsumer.setBrowserListener(this); - } - } public long getNativeId() { @@ -310,7 +305,6 @@ public class AMQConsumer implements BrowserListener { acquireCredit(n); } - @Override public void browseFinished() { MessageDispatch md = new MessageDispatch(); md.setConsumerId(info.getConsumerId()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java index c481618..21a45b1 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java @@ -22,18 +22,11 @@ import org.apache.activemq.command.MessagePull; public abstract class AMQConsumerBrokerExchange { protected final AMQSession amqSession; - private AMQConnectionContext connectionContext; - private boolean wildcard; public AMQConsumerBrokerExchange(AMQSession amqSession) { this.amqSession = amqSession; } - /** - * @param connectionContext the connectionContext to set - */ - public void setConnectionContext(AMQConnectionContext connectionContext) { - this.connectionContext = connectionContext; - } + public abstract void acknowledge(MessageAck ack) throws Exception; public abstract void processMessagePull(MessagePull messagePull) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java index b37e1cf..2f9d0bc 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java @@ -23,8 +23,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.QueueImpl; @@ -36,7 +34,6 @@ public class AMQServerConsumer extends ServerConsumerImpl { // TODO-NOW: remove this once unified AMQConsumer amqConsumer; - boolean isClosing; public AMQConsumer getAmqConsumer() { return amqConsumer; @@ -64,93 +61,6 @@ public class AMQServerConsumer extends ServerConsumerImpl { super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); } - public void setBrowserListener(BrowserListener listener) { - AMQBrowserDeliverer newBrowserDeliverer = new AMQBrowserDeliverer(this.browserDeliverer); - newBrowserDeliverer.listener = listener; - this.browserDeliverer = newBrowserDeliverer; - } - - public void closing() { - isClosing = true; - } - - @Override - public HandleStatus handle(final MessageReference ref) throws Exception { - if (isClosing) { - return HandleStatus.BUSY; - } - return super.handle(ref); - } - - private class AMQBrowserDeliverer extends BrowserDeliverer { - - private BrowserListener listener = null; - - public AMQBrowserDeliverer(final BrowserDeliverer other) { - super(other.iterator); - } - - @Override - public synchronized void run() { - // if the reference was busy during the previous iteration, handle it now - if (current != null) { - try { - HandleStatus status = handle(current); - - if (status == HandleStatus.BUSY) { - return; - } - - if (status == HandleStatus.HANDLED) { - proceedDeliver(current); - } - - current = null; - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, current); - return; - } - } - - MessageReference ref = null; - HandleStatus status; - - while (true) { - try { - ref = null; - synchronized (messageQueue) { - if (!iterator.hasNext()) { - //here we need to send a null for amq browsers - if (listener != null) { - listener.browseFinished(); - } - break; - } - - ref = iterator.next(); - - status = handle(ref); - } - - if (status == HandleStatus.HANDLED) { - proceedDeliver(ref); - } - else if (status == HandleStatus.BUSY) { - // keep a reference on the current message reference - // to handle it next time the browser deliverer is executed - current = ref; - break; - } - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref); - break; - } - } - } - } - public void amqPutBackToDeliveringList(final List refs) { synchronized (this.deliveringRefs) { for (MessageReference ref : refs) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java index b603257..3f0259d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java @@ -94,7 +94,7 @@ public class AMQServerSession extends ServerSessionImpl { Set consumersClone = new HashSet<>(consumers.values()); for (ServerConsumer consumer : consumersClone) { AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer; - amqConsumer.closing();//prevent redeliver + amqConsumer.setStarted(false); } synchronized (this) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index e3d2266..86ea582 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -165,6 +165,14 @@ public class AMQSession implements SessionCallback { } @Override + public void browserFinished(ServerConsumer consumer) { + AMQConsumer theConsumer = ((AMQServerConsumer)consumer).getAmqConsumer(); + if (theConsumer != null) { + theConsumer.browseFinished(); + } + } + + @Override public boolean isWritable(ReadyListener callback) { return connection.isWritable(callback); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java deleted file mode 100644 index 0e192db..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire.amq; - -interface BrowserListener { - - void browseFinished(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index e94e0bc..a6cbe71 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -113,6 +113,11 @@ public class StompSession implements SessionCallback { } @Override + public void browserFinished(ServerConsumer consumer) { + + } + + @Override public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) { LargeServerMessageImpl largeMessage = null; ServerMessage newServerMessage = serverMessage; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 0b74fd7..c05a288 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -98,6 +98,10 @@ public final class CoreSessionCallback implements SessionCallback { channel.send(packet); } + @Override + public void browserFinished(ServerConsumer consumer) { + + } @Override public void afterDelivery() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index cb2cd38..298bf5f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -1213,6 +1213,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref = null; synchronized (messageQueue) { if (!iterator.hasNext()) { + callback.browserFinished(ServerConsumerImpl.this); break; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index 4b27bc4..a9eb0f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -50,4 +50,7 @@ public interface SessionCallback { void disconnect(ServerConsumer consumerId, String queueName); boolean isWritable(ReadyListener callback); + + /** Some protocols (Openwire) needs a special message with the browser is finished. */ + void browserFinished(ServerConsumer consumer); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc03f8f7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index d8aa4ac..54ae6c8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -484,6 +484,11 @@ public class HangConsumerTest extends ActiveMQTestBase { } @Override + public void browserFinished(ServerConsumer consumer) { + + } + + @Override public boolean isWritable(ReadyListener callback) { return true; }