Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CC2A819361 for ; Wed, 16 Mar 2016 15:21:34 +0000 (UTC) Received: (qmail 22487 invoked by uid 500); 16 Mar 2016 15:21:33 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 22399 invoked by uid 500); 16 Mar 2016 15:21:33 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 22188 invoked by uid 99); 16 Mar 2016 15:21:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2016 15:21:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F4239E016C; Wed, 16 Mar 2016 15:21:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 16 Mar 2016 15:22:10 -0000 Message-Id: In-Reply-To: <407517e763c6496895a78ae408224406@git.apache.org> References: <407517e763c6496895a78ae408224406@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [41/61] [abbrv] activemq-artemis git commit: Progress on refactoring - first pass: using AbstractConnection on OpenWireConnection and adding more TODOs Progress on refactoring - first pass: using AbstractConnection on OpenWireConnection and adding more TODOs Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e216548 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e216548 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e216548 Branch: refs/heads/refactor-openwire Commit: 0e21654829e6cdfaac117abe050d06d8b757ada8 Parents: 4829995 Author: Clebert Suconic Authored: Tue Feb 23 21:52:21 2016 -0500 Committer: Clebert Suconic Committed: Wed Mar 16 11:19:15 2016 -0400 ---------------------------------------------------------------------- .../protocol/AbstractRemotingConnection.java | 5 + .../protocol/openwire/AMQTransactionImpl.java | 14 +- .../protocol/openwire/OpenWireConnection.java | 1100 +++++++----------- .../openwire/OpenWireProtocolManager.java | 14 +- .../openwire/amq/AMQProducerBrokerExchange.java | 51 +- .../openwire/amq/AMQServerSessionFactory.java | 9 + .../core/protocol/openwire/amq/AMQSession.java | 13 +- .../openwire/impl/OpenWireServerCallback.java | 75 ++ 8 files changed, 560 insertions(+), 721 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e216548/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java index b759ccc..ee2449b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java @@ -104,6 +104,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection { return transportConnection.getID(); } + + public String getLocalAddress() { + return transportConnection.getLocalAddress(); + } + @Override public String getRemoteAddress() { return transportConnection.getRemoteAddress(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e216548/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java index e356522..bbd7e95 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java @@ -28,22 +28,10 @@ public class AMQTransactionImpl extends TransactionImpl { private boolean rollbackForClose = false; - public AMQTransactionImpl(StorageManager storageManager, int timeoutSeconds) { - super(storageManager, timeoutSeconds); - } - - public AMQTransactionImpl(StorageManager storageManager) { - super(storageManager); - } - public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds) { super(xid, storageManager, timeoutSeconds); } - public AMQTransactionImpl(long id, Xid xid, StorageManager storageManager) { - super(id, xid, storageManager); - } - @Override public RefsOperation createRefsOperation(Queue queue) { return new AMQrefsOperation(queue, storageManager); @@ -55,6 +43,8 @@ public class AMQTransactionImpl extends TransactionImpl { super(queue, storageManager); } + + // This is because the Rollbacks happen through the consumer, not through the server's @Override public void afterRollback(Transaction tx) { if (rollbackForClose) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e216548/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index a6f0f34..dbbb59f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -39,6 +40,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil; +import org.apache.activemq.artemis.core.protocol.openwire.SendingResult; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; @@ -50,6 +54,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; @@ -101,32 +106,22 @@ import org.apache.activemq.wireformat.WireFormat; * Represents an activemq connection. * ToDo: extends AbstractRemotingConnection */ -public class OpenWireConnection implements RemotingConnection, CommandVisitor, SecurityAuth { +public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth { private final OpenWireProtocolManager protocolManager; - private final Connection transportConnection; - - private final long creationTime; - - private final List failureListeners = new CopyOnWriteArrayList<>(); - private final List closeListeners = new CopyOnWriteArrayList<>(); private boolean destroyed = false; private final Object sendLock = new Object(); - private volatile boolean dataReceived; - private final Acceptor acceptorUsed; private final OpenWireFormat wireFormat; private AMQConnectionContext context; - private boolean pendingStop; - private Throwable stopError = null; private final AtomicBoolean stopping = new AtomicBoolean(false); @@ -154,21 +149,16 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S public OpenWireConnection(Acceptor acceptorUsed, Connection connection, + Executor executor, OpenWireProtocolManager openWireProtocolManager, OpenWireFormat wf) { + super(connection, executor); this.protocolManager = openWireProtocolManager; - this.transportConnection = connection; this.acceptorUsed = acceptorUsed; this.wireFormat = wf; - this.creationTime = System.currentTimeMillis(); this.defaultSocketURIString = connection.getLocalAddress(); } - @Override - public boolean isWritable(ReadyListener callback) { - return transportConnection.isWritable(callback); - } - // SecurityAuth implementation @Override public String getUsername() { @@ -181,6 +171,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S // SecurityAuth implementation @Override + public RemotingConnection getRemotingConnection() { + return this; + } + + // SecurityAuth implementation + @Override public String getPassword() { ConnectionInfo info = getConnectionInfo(); if (info == null) { @@ -200,18 +196,15 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S return info; } - public String getLocalAddress() { - return transportConnection.getLocalAddress(); - } - @Override public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { + super.bufferReceived(connectionID, buffer); try { - dataReceived = true; + + // TODO-NOW: set OperationContext Command command = (Command) wireFormat.unmarshal(buffer); - //logger.log("got command: " + command); boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); // the connection handles pings, negotiations directly. @@ -223,36 +216,27 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S // for some reason KeepAliveInfo.isResponseRequired() is always false protocolManager.sendReply(this, info); } - else if (command.getClass() == WireFormatInfo.class) { - // amq here starts a read/write monitor thread (detect ttl?) - negotiate((WireFormatInfo) command); - } else { Response response = null; - if (pendingStop) { - response = new ExceptionResponse(this.stopError); + try { + setLastCommand(command); + response = command.visit(new CommandProcessor()); } - else { - try { - setLastCommand(command); - response = command.visit(this); - } - catch (Exception e) { - if (responseRequired) { - response = new ExceptionResponse(e); - } - } - finally { - setLastCommand(null); + catch (Exception e) { + if (responseRequired) { + response = new ExceptionResponse(e); } + } + finally { + setLastCommand(null); + } - if (response instanceof ExceptionResponse) { - if (!responseRequired) { - Throwable cause = ((ExceptionResponse) response).getException(); - serviceException(cause); - response = null; - } + if (response instanceof ExceptionResponse) { + if (!responseRequired) { + Throwable cause = ((ExceptionResponse) response).getException(); + serviceException(cause); + response = null; } } @@ -272,6 +256,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S } } + // TODO-NOW: response through operation-context + if (response != null && !protocolManager.isStopping()) { response.setCorrelationId(commandId); dispatchSync(response); @@ -280,6 +266,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S } } catch (IOException e) { + + // TODO-NOW: send errors ActiveMQServerLogger.LOGGER.error("error decoding", e); } catch (Throwable t) { @@ -293,135 +281,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S } } - private void negotiate(WireFormatInfo command) throws IOException { - this.wireFormat.renegotiateWireFormat(command); - //throw back a brokerInfo here - protocolManager.sendBrokerInfo(this); - } - - @Override - public Object getID() { - return transportConnection.getID(); - } - - @Override - public long getCreationTime() { - return creationTime; - } - - @Override - public String getRemoteAddress() { - return transportConnection.getRemoteAddress(); - } - - @Override - public void addFailureListener(FailureListener listener) { - if (listener == null) { - throw new IllegalStateException("FailureListener cannot be null"); - } - - failureListeners.add(listener); - } - - @Override - public boolean removeFailureListener(FailureListener listener) { - if (listener == null) { - throw new IllegalStateException("FailureListener cannot be null"); - } - - return failureListeners.remove(listener); - } - - @Override - public void addCloseListener(CloseListener listener) { - if (listener == null) { - throw new IllegalStateException("CloseListener cannot be null"); - } - - closeListeners.add(listener); - } - - @Override - public boolean removeCloseListener(CloseListener listener) { - if (listener == null) { - throw new IllegalStateException("CloseListener cannot be null"); - } - - return closeListeners.remove(listener); - } - - @Override - public List removeCloseListeners() { - List ret = new ArrayList<>(closeListeners); - - closeListeners.clear(); - - return ret; - } - - @Override - public void setCloseListeners(List listeners) { - closeListeners.clear(); - - closeListeners.addAll(listeners); - } - - @Override - public List getFailureListeners() { - // we do not return the listeners otherwise the remoting service - // would NOT destroy the connection. - return Collections.emptyList(); - } - - @Override - public List removeFailureListeners() { - List ret = new ArrayList<>(failureListeners); - - failureListeners.clear(); - - return ret; - } - - @Override - public void setFailureListeners(List listeners) { - failureListeners.clear(); - - failureListeners.addAll(listeners); - } - - @Override - public ActiveMQBuffer createTransportBuffer(int size) { - return ActiveMQBuffers.dynamicBuffer(size); - } - - @Override - public void fail(ActiveMQException me) { - fail(me, null); - } - @Override public void destroy() { fail(null, null); } - private void deleteTempQueues() throws Exception { - Iterator tmpQs = tempQueues.iterator(); - while (tmpQs.hasNext()) { - ActiveMQDestination q = tmpQs.next(); - protocolManager.removeDestination(this, q); - } - } - - @Override - public RemotingConnection getRemotingConnection() { - return this; - } - - @Override - public Connection getTransportConnection() { - return this.transportConnection; - } - @Override public boolean isClient() { return false; @@ -466,22 +330,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S } } - private void callClosingListeners() { - final List listenersClone = new ArrayList<>(closeListeners); - - for (final CloseListener listener : listenersClone) { - try { - listener.connectionClosed(); - } - catch (final Throwable t) { - // Failure of one listener to execute shouldn't prevent others - // from - // executing - ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t); - } - } - } - // throw a WireFormatInfo to the peer public void init() { WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); @@ -509,34 +357,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S } - @Override - public Response processAddConnection(ConnectionInfo info) throws Exception { - //let protoclmanager handle connection add/remove - try { - protocolManager.addConnection(this, info); - } - catch (Exception e) { - if (e instanceof SecurityException) { - // close this down - in case the peer of this transport doesn't play - // nice - delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); - } - Response resp = new ExceptionResponse(e); - return resp; - } - if (info.isManageable() && protocolManager.isUpdateClusterClients()) { - // send ConnectionCommand - ConnectionControl command = protocolManager.newConnectionControl(); - command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration()); - if (info.isFailoverReconnect()) { - command.setRebalanceConnection(false); - } - dispatchAsync(command); - } - return null; - - } - public void dispatchAsync(Command message) { if (!stopping.get()) { dispatchSync(message); @@ -564,7 +384,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S public void serviceExceptionAsync(final IOException e) { if (asyncException.compareAndSet(false, true)) { - // Why this is not through an executor? + // TODO: Why this is not through an executor? new Thread("Async Exception Handler") { @Override public void run() { @@ -585,12 +405,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S try { ConnectionError ce = new ConnectionError(); ce.setException(e); - if (pendingStop) { - dispatchSync(ce); - } - else { - dispatchAsync(ce); - } + dispatchAsync(ce); } finally { inServiceException = false; @@ -649,89 +464,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S } } - public void delayedStop(final int waitTime, final String reason, Throwable cause) { - if (waitTime > 0) { - synchronized (this) { - pendingStop = true; - stopError = cause; - } - } - } - - public void stopAsync() { - // If we're in the middle of starting then go no further... for now. - synchronized (this) { - pendingStop = true; - } - if (stopping.compareAndSet(false, true)) { - if (context != null) { - context.getStopping().set(true); - } - } - } - - protected void doStop() throws Exception { - /* - * What's a duplex bridge? try { synchronized (this) { if (duplexBridge != - * null) { duplexBridge.stop(); } } } catch (Exception ignore) { - * LOG.trace("Exception caught stopping. This exception is ignored.", - * ignore); } - */ - try { - getTransportConnection().close(); - } - catch (Exception e) { - // log - } - - // Run the MessageDispatch callbacks so that message references get - // cleaned up. - synchronized (dispatchQueue) { - for (Iterator iter = dispatchQueue.iterator(); iter.hasNext(); ) { - Command command = iter.next(); - if (command.isMessageDispatch()) { - MessageDispatch md = (MessageDispatch) command; - TransmitCallback sub = md.getTransmitCallback(); - protocolManager.postProcessDispatch(md); - if (sub != null) { - sub.onFailure(); - } - } - } - dispatchQueue.clear(); - } - // - // Remove all logical connection associated with this connection - // from the broker. - if (!protocolManager.isStopped()) { - context.getStopping().set(true); - try { - processRemoveConnection(state.getInfo().getConnectionId(), 0L); - } - catch (Throwable ignore) { - ignore.printStackTrace(); - } - } - } - - @Override - public Response processAddConsumer(ConsumerInfo info) { - Response resp = null; - try { - protocolManager.addConsumer(this, info); - } - catch (Exception e) { - e.printStackTrace(); - if (e instanceof ActiveMQSecurityException) { - resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); - } - else { - resp = new ExceptionResponse(e); - } - } - return resp; - } - public void addConsumerBrokerExchange(ConsumerId id, AMQSession amqSession, Map consumerMap) { @@ -762,222 +494,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S } } - public int getConsumerCount() { - int result = 0; - for (SessionId sessionId : state.getSessionIds()) { - SessionState sessionState = state.getSessionState(sessionId); - if (sessionState != null) { - result += sessionState.getConsumerIds().size(); - } - } - return result; - } - - public int getProducerCount() { - int result = 0; - for (SessionId sessionId : state.getSessionIds()) { - SessionState sessionState = state.getSessionState(sessionId); - if (sessionState != null) { - result += sessionState.getProducerIds().size(); - } - } - return result; - } - - @Override - public Response processAddDestination(DestinationInfo dest) throws Exception { - Response resp = null; - try { - protocolManager.addDestination(this, dest); - } - catch (Exception e) { - if (e instanceof ActiveMQSecurityException) { - resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); - } - else { - resp = new ExceptionResponse(e); - } - } - return resp; - } - - @Override - public Response processAddProducer(ProducerInfo info) throws Exception { - Response resp = null; - try { - protocolManager.addProducer(this, info); - } - catch (Exception e) { - if (e instanceof ActiveMQSecurityException) { - resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); - } - else if (e instanceof ActiveMQNonExistentQueueException) { - resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage())); - } - else { - resp = new ExceptionResponse(e); - } - } - return resp; - } - - @Override - public Response processAddSession(SessionInfo info) throws Exception { - // Avoid replaying dup commands - if (!state.getSessionIds().contains(info.getSessionId())) { - protocolManager.addSession(this, info); - try { - state.addSession(info); - } - catch (IllegalStateException e) { - e.printStackTrace(); - protocolManager.removeSession(context, info); - } - } - return null; - } - - @Override - public Response processBeginTransaction(TransactionInfo info) throws Exception { - TransactionId txId = info.getTransactionId(); - - if (!txMap.containsKey(txId)) { - txMap.put(txId, info); - } - return null; - } - - @Override - public Response processBrokerInfo(BrokerInfo arg0) throws Exception { - throw new IllegalStateException("not implemented! "); - } - - @Override - public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { - protocolManager.commitTransactionOnePhase(info); - TransactionId txId = info.getTransactionId(); - txMap.remove(txId); - - return null; - } - - @Override - public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { - protocolManager.commitTransactionTwoPhase(info); - TransactionId txId = info.getTransactionId(); - txMap.remove(txId); - - return null; - } - - @Override - public Response processConnectionControl(ConnectionControl connectionControl) throws Exception { - //activemq5 keeps a var to remember only the faultTolerant flag - //this can be sent over a reconnected transport as the first command - //before restoring the connection. - return null; - } - - @Override - public Response processConnectionError(ConnectionError arg0) throws Exception { - throw new IllegalStateException("not implemented! "); - } - - @Override - public Response processConsumerControl(ConsumerControl consumerControl) throws Exception { - //amq5 clients send this command to restore prefetchSize - //after successful reconnect - try { - protocolManager.updateConsumer(this, consumerControl); - } - catch (Exception e) { - //log error - } - return null; - } - - @Override - public Response processControlCommand(ControlCommand arg0) throws Exception { - throw new IllegalStateException("not implemented! "); - } - - @Override - public Response processEndTransaction(TransactionInfo info) throws Exception { - protocolManager.endTransaction(info); - TransactionId txId = info.getTransactionId(); - - if (!txMap.containsKey(txId)) { - txMap.put(txId, info); - } - return null; - } - - @Override - public Response processFlush(FlushCommand arg0) throws Exception { - throw new IllegalStateException("not implemented! "); - } - - @Override - public Response processForgetTransaction(TransactionInfo info) throws Exception { - TransactionId txId = info.getTransactionId(); - txMap.remove(txId); - - protocolManager.forgetTransaction(info.getTransactionId()); - return null; - } - - @Override - public Response processKeepAlive(KeepAliveInfo arg0) throws Exception { - throw new IllegalStateException("not implemented! "); - } - - @Override - public Response processMessage(Message messageSend) { - Response resp = null; - try { - ProducerId producerId = messageSend.getProducerId(); - AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); - final AMQConnectionContext pcontext = producerExchange.getConnectionContext(); - final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); - boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode(); - - AMQSession session = protocolManager.getSession(producerId.getParentId()); - - if (producerExchange.canDispatch(messageSend)) { - SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); - if (result.isBlockNextSend()) { - if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) { - throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); - } - - if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) { - //in that case don't send the response - //this will force the client to wait until - //the response is got. - context.setDontSendReponse(true); - } - else { - //hang the connection until the space is available - session.blockingWaitForSpace(producerExchange, result); - } - } - else if (sendProducerAck) { - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - this.dispatchAsync(ack); - } - } - } - catch (Throwable e) { - if (e instanceof ActiveMQSecurityException) { - resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); - } - else { - resp = new ExceptionResponse(e); - } - } - return resp; - } - private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { AMQProducerBrokerExchange result = producerExchanges.get(id); if (result == null) { @@ -1007,163 +523,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S return result; } - @Override - public Response processMessageAck(MessageAck ack) throws Exception { - AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId()); - consumerBrokerExchange.acknowledge(ack); - return null; - } - - @Override - public Response processMessageDispatch(MessageDispatch arg0) throws Exception { - throw new IllegalStateException("not implemented! "); - } - - @Override - public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception { - throw new IllegalStateException("not implemented! "); - } - - @Override - public Response processMessagePull(MessagePull arg0) throws Exception { - AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId()); - if (amqConsumerBrokerExchange == null) { - throw new IllegalStateException("Consumer does not exist"); - } - amqConsumerBrokerExchange.processMessagePull(arg0); - return null; - } - - @Override - public Response processPrepareTransaction(TransactionInfo info) throws Exception { - protocolManager.prepareTransaction(info); - return null; - } - - @Override - public Response processProducerAck(ProducerAck arg0) throws Exception { - throw new IllegalStateException("not implemented! "); - } - - @Override - public Response processRecoverTransactions(TransactionInfo info) throws Exception { - Set sIds = state.getSessionIds(); - TransactionId[] recovered = protocolManager.recoverTransactions(sIds); - return new DataArrayResponse(recovered); - } - - @Override - public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { - //we let protocol manager to handle connection add/remove - try { - protocolManager.removeConnection(this, state.getInfo(), null); - } - catch (Throwable e) { - // log - } - return null; - } - - @Override - public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { - SessionId sessionId = id.getParentId(); - SessionState ss = state.getSessionState(sessionId); - if (ss == null) { - throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId); - } - ConsumerState consumerState = ss.removeConsumer(id); - if (consumerState == null) { - throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); - } - ConsumerInfo info = consumerState.getInfo(); - info.setLastDeliveredSequenceId(lastDeliveredSequenceId); - - AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id); - - consumerBrokerExchange.removeConsumer(); - - removeConsumerBrokerExchange(id); - - return null; - } - private void removeConsumerBrokerExchange(ConsumerId id) { synchronized (consumerExchanges) { consumerExchanges.remove(id); } } - @Override - public Response processRemoveDestination(DestinationInfo info) throws Exception { - ActiveMQDestination dest = info.getDestination(); - protocolManager.removeDestination(this, dest); - return null; - } - - @Override - public Response processRemoveProducer(ProducerId id) throws Exception { - protocolManager.removeProducer(id); - return null; - } - - @Override - public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { - SessionState session = state.getSessionState(id); - if (session == null) { - throw new IllegalStateException("Cannot remove session that had not been registered: " + id); - } - // Don't let new consumers or producers get added while we are closing - // this down. - session.shutdown(); - // Cascade the connection stop producers. - // we don't stop consumer because in core - // closing the session will do the job - for (ProducerId producerId : session.getProducerIds()) { - try { - processRemoveProducer(producerId); - } - catch (Throwable e) { - // LOG.warn("Failed to remove producer: {}", producerId, e); - } - } - state.removeSession(id); - protocolManager.removeSession(context, session.getInfo()); - return null; - } - - @Override - public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception { - protocolManager.removeSubscription(subInfo); - return null; - } - - @Override - public Response processRollbackTransaction(TransactionInfo info) throws Exception { - protocolManager.rollbackTransaction(info); - TransactionId txId = info.getTransactionId(); - txMap.remove(txId); - return null; - } - - @Override - public Response processShutdown(ShutdownInfo info) throws Exception { - this.shutdown(false); - return null; - } - - @Override - public Response processWireFormat(WireFormatInfo arg0) throws Exception { - throw new IllegalStateException("not implemented! "); - } - - public int getMaximumConsumersAllowedPerConnection() { - return 1000000;//this belongs to configuration, now hardcoded - } - - public int getMaximumProducersAllowedPerConnection() { - return 1000000;//this belongs to configuration, now hardcoded - } - public void deliverMessage(MessageDispatch dispatch) { Message m = dispatch.getMessage(); if (m != null) { @@ -1323,4 +688,393 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S context.setReconnect(true); context.incRefCount(); } + + // This will listen for commands throught the protocolmanager + class CommandProcessor implements CommandVisitor { + + @Override + public Response processAddConnection(ConnectionInfo info) throws Exception { + //let protoclmanager handle connection add/remove + try { + protocolManager.addConnection(OpenWireConnection.this, info); + } + catch (Exception e) { + Response resp = new ExceptionResponse(e); + return resp; + } + if (info.isManageable() && protocolManager.isUpdateClusterClients()) { + // send ConnectionCommand + ConnectionControl command = protocolManager.newConnectionControl(); + command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration()); + if (info.isFailoverReconnect()) { + command.setRebalanceConnection(false); + } + dispatchAsync(command); + } + return null; + + } + + @Override + public Response processAddProducer(ProducerInfo info) throws Exception { + Response resp = null; + try { + protocolManager.addProducer(OpenWireConnection.this, info); + } + catch (Exception e) { + if (e instanceof ActiveMQSecurityException) { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else if (e instanceof ActiveMQNonExistentQueueException) { + resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage())); + } + else { + resp = new ExceptionResponse(e); + } + } + return resp; + } + + @Override + public Response processAddConsumer(ConsumerInfo info) { + Response resp = null; + try { + protocolManager.addConsumer(OpenWireConnection.this, info); + } + catch (Exception e) { + e.printStackTrace(); + if (e instanceof ActiveMQSecurityException) { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else { + resp = new ExceptionResponse(e); + } + } + return resp; + } + + @Override + public Response processRemoveDestination(DestinationInfo info) throws Exception { + ActiveMQDestination dest = info.getDestination(); + protocolManager.removeDestination(OpenWireConnection.this, dest); + return null; + } + + @Override + public Response processRemoveProducer(ProducerId id) throws Exception { + protocolManager.removeProducer(id); + return null; + } + + @Override + public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { + SessionState session = state.getSessionState(id); + if (session == null) { + throw new IllegalStateException("Cannot remove session that had not been registered: " + id); + } + // Don't let new consumers or producers get added while we are closing + // this down. + session.shutdown(); + // Cascade the connection stop producers. + // we don't stop consumer because in core + // closing the session will do the job + for (ProducerId producerId : session.getProducerIds()) { + try { + processRemoveProducer(producerId); + } + catch (Throwable e) { + // LOG.warn("Failed to remove producer: {}", producerId, e); + } + } + state.removeSession(id); + protocolManager.removeSession(context, session.getInfo()); + return null; + } + + @Override + public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception { + protocolManager.removeSubscription(subInfo); + return null; + } + + @Override + public Response processRollbackTransaction(TransactionInfo info) throws Exception { + protocolManager.rollbackTransaction(info); + TransactionId txId = info.getTransactionId(); + txMap.remove(txId); + return null; + } + + @Override + public Response processShutdown(ShutdownInfo info) throws Exception { + OpenWireConnection.this.shutdown(false); + return null; + } + + @Override + public Response processWireFormat(WireFormatInfo command) throws Exception { + wireFormat.renegotiateWireFormat(command); + //throw back a brokerInfo here + protocolManager.sendBrokerInfo(OpenWireConnection.this); + return null; + } + + @Override + public Response processAddDestination(DestinationInfo dest) throws Exception { + Response resp = null; + try { + protocolManager.addDestination(OpenWireConnection.this, dest); + } + catch (Exception e) { + if (e instanceof ActiveMQSecurityException) { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else { + resp = new ExceptionResponse(e); + } + } + return resp; + } + + @Override + public Response processAddSession(SessionInfo info) throws Exception { + // Avoid replaying dup commands + if (!state.getSessionIds().contains(info.getSessionId())) { + protocolManager.addSession(OpenWireConnection.this, info); + try { + state.addSession(info); + } + catch (IllegalStateException e) { + e.printStackTrace(); + protocolManager.removeSession(context, info); + } + } + return null; + } + + @Override + public Response processBeginTransaction(TransactionInfo info) throws Exception { + TransactionId txId = info.getTransactionId(); + + if (!txMap.containsKey(txId)) { + txMap.put(txId, info); + } + return null; + } + + @Override + public Response processBrokerInfo(BrokerInfo arg0) throws Exception { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + protocolManager.commitTransactionOnePhase(info); + TransactionId txId = info.getTransactionId(); + txMap.remove(txId); + + return null; + } + + @Override + public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + protocolManager.commitTransactionTwoPhase(info); + TransactionId txId = info.getTransactionId(); + txMap.remove(txId); + + return null; + } + + @Override + public Response processConnectionControl(ConnectionControl connectionControl) throws Exception { + //activemq5 keeps a var to remember only the faultTolerant flag + //this can be sent over a reconnected transport as the first command + //before restoring the connection. + return null; + } + + @Override + public Response processConnectionError(ConnectionError arg0) throws Exception { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processConsumerControl(ConsumerControl consumerControl) throws Exception { + //amq5 clients send this command to restore prefetchSize + //after successful reconnect + try { + protocolManager.updateConsumer(OpenWireConnection.this, consumerControl); + } + catch (Exception e) { + //log error + } + return null; + } + + @Override + public Response processControlCommand(ControlCommand arg0) throws Exception { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processEndTransaction(TransactionInfo info) throws Exception { + protocolManager.endTransaction(info); + TransactionId txId = info.getTransactionId(); + + if (!txMap.containsKey(txId)) { + txMap.put(txId, info); + } + return null; + } + + @Override + public Response processFlush(FlushCommand arg0) throws Exception { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processForgetTransaction(TransactionInfo info) throws Exception { + TransactionId txId = info.getTransactionId(); + txMap.remove(txId); + + protocolManager.forgetTransaction(info.getTransactionId()); + return null; + } + + @Override + public Response processKeepAlive(KeepAliveInfo arg0) throws Exception { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processMessage(Message messageSend) { + Response resp = null; + try { + ProducerId producerId = messageSend.getProducerId(); + AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); + final AMQConnectionContext pcontext = producerExchange.getConnectionContext(); + final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); + boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode(); + + AMQSession session = protocolManager.getSession(producerId.getParentId()); + + // TODO: canDispatch is always returning true; + if (producerExchange.canDispatch(messageSend)) { + SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); + if (result.isBlockNextSend()) { + if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) { + // TODO see logging + throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); + } + + if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) { + //in that case don't send the response + //this will force the client to wait until + //the response is got. + context.setDontSendReponse(true); + } + else { + //hang the connection until the space is available + session.blockingWaitForSpace(producerExchange, result); + } + } + else if (sendProducerAck) { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + OpenWireConnection.this.dispatchAsync(ack); + } + } + } + catch (Throwable e) { + if (e instanceof ActiveMQSecurityException) { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else { + resp = new ExceptionResponse(e); + } + } + return resp; + } + + @Override + public Response processMessageAck(MessageAck ack) throws Exception { + AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId()); + consumerBrokerExchange.acknowledge(ack); + return null; + } + + @Override + public Response processMessageDispatch(MessageDispatch arg0) throws Exception { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processMessagePull(MessagePull arg0) throws Exception { + AMQConsumerBrokerExchange amqConsumerBrokerExchange = consumerExchanges.get(arg0.getConsumerId()); + if (amqConsumerBrokerExchange == null) { + throw new IllegalStateException("Consumer does not exist"); + } + amqConsumerBrokerExchange.processMessagePull(arg0); + return null; + } + + @Override + public Response processPrepareTransaction(TransactionInfo info) throws Exception { + protocolManager.prepareTransaction(info); + return null; + } + + @Override + public Response processProducerAck(ProducerAck arg0) throws Exception { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processRecoverTransactions(TransactionInfo info) throws Exception { + Set sIds = state.getSessionIds(); + TransactionId[] recovered = protocolManager.recoverTransactions(sIds); + return new DataArrayResponse(recovered); + } + + @Override + public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { + //we let protocol manager to handle connection add/remove + try { + protocolManager.removeConnection(OpenWireConnection.this, state.getInfo(), null); + } + catch (Throwable e) { + // log + } + return null; + } + + @Override + public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { + SessionId sessionId = id.getParentId(); + SessionState ss = state.getSessionState(sessionId); + if (ss == null) { + throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId); + } + ConsumerState consumerState = ss.removeConsumer(id); + if (consumerState == null) { + throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); + } + ConsumerInfo info = consumerState.getInfo(); + info.setLastDeliveredSequenceId(lastDeliveredSequenceId); + + AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id); + + consumerBrokerExchange.removeConsumer(); + + removeConsumerBrokerExchange(id); + + return null; + } + + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e216548/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 014181d..440fcce 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -219,7 +219,7 @@ public class OpenWireProtocolManager implements ProtocolManager, No @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); - OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf); + OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, server.getExecutorFactory().getExecutor(), this, wf); owConn.init(); // TODO CLEBERT What is this constant here? we should get it from TTL initial pings @@ -506,9 +506,6 @@ public class OpenWireProtocolManager implements ProtocolManager, No ActiveMQDestination destination = info.getDestination(); if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { - if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) { - throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection()); - } if (destination.isQueue()) { OpenWireUtil.validateDestination(destination, amqSession); } @@ -549,12 +546,6 @@ public class OpenWireProtocolManager implements ProtocolManager, No } // Avoid replaying dup commands if (!ss.getConsumerIds().contains(info.getConsumerId())) { - ActiveMQDestination destination = info.getDestination(); - if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { - if (theConn.getConsumerCount() >= theConn.getMaximumConsumersAllowedPerConnection()) { - throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumConsumersAllowedPerConnection()); - } - } AMQSession amqSession = sessions.get(sessionId); if (amqSession == null) { @@ -760,6 +751,9 @@ public class OpenWireProtocolManager implements ProtocolManager, No transactions.remove(xid); } + /** + * TODO: remove this, use the regular ResourceManager from the Server's + * */ public void registerTx(TransactionId txId, AMQSession amqSession) { transactions.put(txId, amqSession); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e216548/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java index f94c119..e9c4044 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java @@ -29,8 +29,6 @@ public class AMQProducerBrokerExchange { private ProducerState producerState; private boolean mutable = true; private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); - private boolean auditProducerSequenceIds; - private boolean isNetworkProducer; private final FlowControlInfo flowControlInfo = new FlowControlInfo(); public AMQProducerBrokerExchange() { @@ -92,29 +90,34 @@ public class AMQProducerBrokerExchange { * @return false if message should be ignored as a duplicate */ public boolean canDispatch(Message messageSend) { + // TODO: auditProduceSequenceIds is never true boolean canDispatch = true; - if (auditProducerSequenceIds && messageSend.isPersistent()) { - final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId(); - if (isNetworkProducer) { - // messages are multiplexed on this producer so we need to query the - // persistenceAdapter - long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId()); - if (producerSequenceId <= lastStoredForMessageProducer) { - canDispatch = false; - } - } - else if (producerSequenceId <= lastSendSequenceNumber.get()) { - canDispatch = false; - if (messageSend.isInTransaction()) { - } - else { - } - } - else { - // track current so we can suppress duplicates later in the stream - lastSendSequenceNumber.set(producerSequenceId); - } - } + //TODO: DEAD CODE +// if (auditProducerSequenceIds && messageSend.isPersistent()) { +// final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId(); +// if (isNetworkProducer) { +// // messages are multiplexed on this producer so we need to query the +// // persistenceAdapter +// long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId()); +// if (producerSequenceId <= lastStoredForMessageProducer) { +// canDispatch = false; +// } +// } +// else if (producerSequenceId <= lastSendSequenceNumber.get()) { +// canDispatch = false; +// // TODO: WHAT IS THIS? +// if (messageSend.isInTransaction()) { +// +// +// } +// else { +// } +// } +// else { +// // track current so we can suppress duplicates later in the stream +// lastSendSequenceNumber.set(producerSequenceId); +// } +// } return canDispatch; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e216548/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java index 9ce21e3..a6ca4a0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java @@ -32,6 +32,15 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; public class AMQServerSessionFactory implements ServerSessionFactory { + private static final AMQServerSessionFactory singleInstance = new AMQServerSessionFactory(); + + public static AMQServerSessionFactory getInstance() { + return singleInstance; + } + + private AMQServerSessionFactory() { + } + @Override public ServerSessionImpl createCoreSession(String name, String username, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e216548/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index f5ccb82..0cee3d3 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -63,8 +63,8 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.wireformat.WireFormat; public class AMQSession implements SessionCallback { - private AMQServerSession coreSession; private ConnectionInfo connInfo; + private AMQServerSession coreSession; private SessionInfo sessInfo; private ActiveMQServer server; private OpenWireConnection connection; @@ -91,6 +91,7 @@ public class AMQSession implements SessionCallback { OpenWireProtocolManager manager) { this.connInfo = connInfo; this.sessInfo = sessInfo; + this.server = server; this.connection = connection; this.scheduledPool = scheduledPool; @@ -107,7 +108,7 @@ public class AMQSession implements SessionCallback { // now try { - coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, new AMQServerSessionFactory(), true); + coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, AMQServerSessionFactory.getInstance(), true); long sessionId = sessInfo.getSessionId().getValue(); if (sessionId == -1) { @@ -144,6 +145,7 @@ public class AMQSession implements SessionCallback { } connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap); + // TODO: This is wrong. We should only start when the client starts coreSession.start(); started.set(true); } @@ -281,6 +283,9 @@ public class AMQSession implements SessionCallback { coreMsg.setAddress(address); PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address); + + + // TODO: Improve this, tested with ProducerFlowControlSendFailTest if (store.isFull()) { result.setBlockNextSend(true); result.setBlockPagingStore(store); @@ -526,8 +531,12 @@ public class AMQSession implements SessionCallback { } } + // TODO: remove this, we should do the same as we do on core for blocking public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange, SendingResult result) throws IOException { + + + new Exception("blocking").printStackTrace(); long start = System.currentTimeMillis(); long nextWarn = start; producerExchange.blockingOnFlowControl(true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e216548/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java new file mode 100644 index 0000000..8ab3815 --- /dev/null +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.openwire.impl; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; + +public class OpenWireServerCallback implements SessionCallback { + + @Override + public boolean hasCredits(ServerConsumer consumerID) { + return false; + } + + @Override + public void sendProducerCreditsMessage(int credits, SimpleString address) { + + } + + @Override + public void sendProducerCreditsFailMessage(int credits, SimpleString address) { + + } + + @Override + public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) { + return 0; + } + + @Override + public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) { + return 0; + } + + @Override + public int sendLargeMessageContinuation(ServerConsumer consumerID, + byte[] body, + boolean continues, + boolean requiresResponse) { + return 0; + } + + @Override + public void closed() { + + } + + @Override + public void disconnect(ServerConsumer consumerId, String queueName) { + + } + + @Override + public boolean isWritable(ReadyListener callback) { + return false; + } +}