Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9253717A99 for ; Tue, 11 Nov 2014 18:41:38 +0000 (UTC) Received: (qmail 33065 invoked by uid 500); 11 Nov 2014 18:41:34 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 32980 invoked by uid 500); 11 Nov 2014 18:41:34 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 31316 invoked by uid 99); 11 Nov 2014 18:41:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2014 18:41:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 84445A0D781; Tue, 11 Nov 2014 18:41:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andytaylor@apache.org To: commits@activemq.apache.org Date: Tue, 11 Nov 2014 18:42:08 -0000 Message-Id: <918e164ebeea41698cfdc67dc98d5b30@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [38/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionImpl.java new file mode 100644 index 0000000..e912369 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionImpl.java @@ -0,0 +1,1959 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientConsumer; +import org.apache.activemq6.api.core.client.ClientMessage; +import org.apache.activemq6.api.core.client.ClientProducer; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.FailoverEventListener; +import org.apache.activemq6.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq6.api.core.client.SessionFailureListener; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.remoting.FailureListener; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.spi.core.remoting.ConsumerContext; +import org.apache.activemq6.spi.core.remoting.SessionContext; +import org.apache.activemq6.utils.ConfirmationWindowWarning; +import org.apache.activemq6.utils.TokenBucketLimiterImpl; +import org.apache.activemq6.utils.XidCodecSupport; + +/** + * @author Tim Fox + * @author Ovidiu Feodorov + * @author Clebert Suconic + * @author Jeff Mesnil + * @author Andy Taylor + * @author Martyn Taylor + */ +public final class ClientSessionImpl implements ClientSessionInternal, FailureListener +{ + private final Map metadata = new HashMap(); + + private final ClientSessionFactoryInternal sessionFactory; + + private final String name; + + private final String username; + + private final String password; + + private final boolean xa; + + private final Executor executor; + + // to be sent to consumers as consumers will need a separate consumer for flow control + private final Executor flowControlExecutor; + + /** + * All access to producers are guarded (i.e. synchronized) on itself. + */ + private final Set producers = new HashSet(); + + // Consumers must be an ordered map so if we fail we recreate them in the same order with the same ids + private final Map consumers = new LinkedHashMap(); + + private volatile boolean closed; + + private final boolean autoCommitAcks; + + private final boolean preAcknowledge; + + private final boolean autoCommitSends; + + private final boolean blockOnAcknowledge; + + private final boolean autoGroup; + + private final int ackBatchSize; + + private final int consumerWindowSize; + + private final int consumerMaxRate; + + private final int confirmationWindowSize; + + private final int producerMaxRate; + + private final boolean blockOnNonDurableSend; + + private final boolean blockOnDurableSend; + + private final int minLargeMessageSize; + + private final boolean compressLargeMessages; + + private volatile int initialMessagePacketSize; + + private final boolean cacheLargeMessageClient; + + private final SessionContext sessionContext; + + // For testing only + private boolean forceNotSameRM; + + private final ClientProducerCreditManager producerCreditManager; + + private volatile boolean started; + + private volatile boolean rollbackOnly; + + private volatile boolean workDone; + + private final String groupID; + + private volatile boolean inClose; + + private volatile boolean mayAttemptToFailover = true; + + private volatile SimpleString defaultAddress; + + /** + * Current XID. this will be used in case of failover + */ + private Xid currentXID; + + + private final AtomicInteger concurrentCall = new AtomicInteger(0); + + private final ConfirmationWindowWarning confirmationWindowWarning; + + ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory, + final String name, + final String username, + final String password, + final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final boolean blockOnAcknowledge, + final boolean autoGroup, + final int ackBatchSize, + final int consumerWindowSize, + final int consumerMaxRate, + final int confirmationWindowSize, + final int producerWindowSize, + final int producerMaxRate, + final boolean blockOnNonDurableSend, + final boolean blockOnDurableSend, + final boolean cacheLargeMessageClient, + final int minLargeMessageSize, + final boolean compressLargeMessages, + final int initialMessagePacketSize, + final String groupID, + final SessionContext sessionContext, + final Executor executor, + final Executor flowControlExecutor) throws HornetQException + { + this.sessionFactory = sessionFactory; + + this.name = name; + + this.username = username; + + this.password = password; + + this.executor = executor; + + this.flowControlExecutor = flowControlExecutor; + + this.xa = xa; + + this.autoCommitAcks = autoCommitAcks; + + this.preAcknowledge = preAcknowledge; + + this.autoCommitSends = autoCommitSends; + + this.blockOnAcknowledge = blockOnAcknowledge; + + this.autoGroup = autoGroup; + + this.ackBatchSize = ackBatchSize; + + this.consumerWindowSize = consumerWindowSize; + + this.consumerMaxRate = consumerMaxRate; + + this.confirmationWindowSize = confirmationWindowSize; + + this.producerMaxRate = producerMaxRate; + + this.blockOnNonDurableSend = blockOnNonDurableSend; + + this.blockOnDurableSend = blockOnDurableSend; + + this.cacheLargeMessageClient = cacheLargeMessageClient; + + this.minLargeMessageSize = minLargeMessageSize; + + this.compressLargeMessages = compressLargeMessages; + + this.initialMessagePacketSize = initialMessagePacketSize; + + this.groupID = groupID; + + producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize); + + this.sessionContext = sessionContext; + + sessionContext.setSession(this); + + confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning(); + } + + // ClientSession implementation + // ----------------------------------------------------------------- + + public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException + { + internalCreateQueue(address, queueName, null, false, false); + } + + public void createQueue(final SimpleString address, final SimpleString queueName, final boolean durable) throws HornetQException + { + internalCreateQueue(address, queueName, null, durable, false); + } + + public void createQueue(final String address, final String queueName, final boolean durable) throws HornetQException + { + createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), durable); + } + + public void createSharedQueue(SimpleString address, + SimpleString queueName, + boolean durable) throws HornetQException + { + createSharedQueue(address, queueName, null, durable); + } + + public void createSharedQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable) throws HornetQException + { + + checkClosed(); + + + startCall(); + try + { + sessionContext.createSharedQueue(address, queueName, filterString, durable); + } + finally + { + endCall(); + } + + } + + + public void createQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable) throws HornetQException + { + internalCreateQueue(address, queueName, filterString, durable, false); + } + + public void createQueue(final String address, + final String queueName, + final String filterString, + final boolean durable) throws HornetQException + { + createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString), durable); + } + + public void createTemporaryQueue(final SimpleString address, final SimpleString queueName) throws HornetQException + { + internalCreateQueue(address, queueName, null, false, true); + } + + public void createTemporaryQueue(final String address, final String queueName) throws HornetQException + { + internalCreateQueue(SimpleString.toSimpleString(address), + SimpleString.toSimpleString(queueName), + null, + false, + true); + } + + public void createTemporaryQueue(final SimpleString address, final SimpleString queueName, final SimpleString filter) throws HornetQException + { + internalCreateQueue(address, queueName, filter, false, true); + } + + public void createTemporaryQueue(final String address, final String queueName, final String filter) throws HornetQException + { + internalCreateQueue(SimpleString.toSimpleString(address), + SimpleString.toSimpleString(queueName), + SimpleString.toSimpleString(filter), + false, + true); + } + + public void deleteQueue(final SimpleString queueName) throws HornetQException + { + checkClosed(); + + startCall(); + try + { + sessionContext.deleteQueue(queueName); + } + finally + { + endCall(); + } + } + + public void deleteQueue(final String queueName) throws HornetQException + { + deleteQueue(SimpleString.toSimpleString(queueName)); + } + + public QueueQuery queueQuery(final SimpleString queueName) throws HornetQException + { + checkClosed(); + + + startCall(); + try + { + return sessionContext.queueQuery(queueName); + } + finally + { + endCall(); + } + + } + + /** + * Use {@link #addressQuery(org.apache.activemq6.api.core.SimpleString)} instead + * + * @param address + * @return + * @throws HornetQException + */ + @Deprecated + public BindingQuery bindingQuery(final SimpleString address) throws HornetQException + { + return (BindingQuery) addressQuery(address); + } + + public AddressQuery addressQuery(final SimpleString address) throws HornetQException + { + checkClosed(); + + + return sessionContext.addressQuery(address); + } + + public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException + { + return createConsumer(queueName, null, false); + } + + public ClientConsumer createConsumer(final String queueName) throws HornetQException + { + return createConsumer(SimpleString.toSimpleString(queueName)); + } + + public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString) throws HornetQException + { + return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, false); + } + + public void createQueue(final String address, final String queueName) throws HornetQException + { + createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName)); + } + + public ClientConsumer createConsumer(final String queueName, final String filterString) throws HornetQException + { + return createConsumer(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString)); + } + + public ClientConsumer createConsumer(final SimpleString queueName, + final SimpleString filterString, + final boolean browseOnly) throws HornetQException + { + return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, browseOnly); + } + + public ClientConsumer createConsumer(final SimpleString queueName, final boolean browseOnly) throws HornetQException + { + return createConsumer(queueName, null, consumerWindowSize, consumerMaxRate, browseOnly); + } + + public ClientConsumer createConsumer(final String queueName, final String filterString, final boolean browseOnly) throws HornetQException + { + return createConsumer(SimpleString.toSimpleString(queueName), + SimpleString.toSimpleString(filterString), + browseOnly); + } + + public ClientConsumer createConsumer(final String queueName, final boolean browseOnly) throws HornetQException + { + return createConsumer(SimpleString.toSimpleString(queueName), null, browseOnly); + } + + /** + * Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on + * the remoting thread). + *

+ * Direct consumers have issues with blocking and failover. E.g. if direct then inside + * MessageHandler call a blocking method like rollback or acknowledge (blocking) This can block + * until failover completes, which disallows the thread to be used to deliver any responses to + * the client during that period, so failover won't occur. If we want direct consumers we need to + * rethink how they work. + */ + public ClientConsumer createConsumer(final SimpleString queueName, + final SimpleString filterString, + final int windowSize, + final int maxRate, + final boolean browseOnly) throws HornetQException + { + return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly); + } + + public ClientConsumer createConsumer(final String queueName, + final String filterString, + final int windowSize, + final int maxRate, + final boolean browseOnly) throws HornetQException + { + return createConsumer(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString), windowSize, maxRate, browseOnly); + } + + public ClientProducer createProducer() throws HornetQException + { + return createProducer((SimpleString) null); + } + + public ClientProducer createProducer(final SimpleString address) throws HornetQException + { + return createProducer(address, producerMaxRate); + } + + public ClientProducer createProducer(final String address) throws HornetQException + { + return createProducer(SimpleString.toSimpleString(address)); + } + + public ClientProducer createProducer(final SimpleString address, final int maxRate) throws HornetQException + { + return internalCreateProducer(address, maxRate); + } + + public ClientProducer createProducer(final String address, final int rate) throws HornetQException + { + return createProducer(SimpleString.toSimpleString(address), rate); + } + + public XAResource getXAResource() + { + return this; + } + + private void rollbackOnFailover(boolean outcomeKnown) throws HornetQException + { + rollback(false); + + if (outcomeKnown) + { + throw HornetQClientMessageBundle.BUNDLE.txRolledBack(); + } + + throw HornetQClientMessageBundle.BUNDLE.txOutcomeUnknown(); + } + + public void commit() throws HornetQException + { + checkClosed(); + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("Sending commit"); + } + + /* + * we have failed over since any work was done so we should rollback + * */ + if (rollbackOnly) + { + rollbackOnFailover(true); + } + + flushAcks(); + /* + * if we have failed over whilst flushing the acks then we should rollback and throw exception before attempting to + * commit as committing might actually commit something but we we wouldn't know and rollback after the commit + * */ + if (rollbackOnly) + { + rollbackOnFailover(true); + } + try + { + sessionContext.simpleCommit(); + } + catch (HornetQException e) + { + if (e.getType() == HornetQExceptionType.UNBLOCKED || rollbackOnly) + { + // The call to commit was unlocked on failover, we therefore rollback the tx, + // and throw a transaction rolled back exception instead + //or + //if we have been set to rollbackonly then we have probably failed over and don't know if the tx has committed + rollbackOnFailover(false); + } + else + { + throw e; + } + } + + //oops, we have failed over during the commit and don't know what happened + if (rollbackOnly) + { + rollbackOnFailover(false); + } + + workDone = false; + } + + public boolean isRollbackOnly() + { + return rollbackOnly; + } + + public void rollback() throws HornetQException + { + rollback(false); + } + + public void rollback(final boolean isLastMessageAsDelivered) throws HornetQException + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")"); + } + checkClosed(); + + // We do a "JMS style" rollback where the session is stopped, and the buffer is cancelled back + // first before rolling back + // This ensures messages are received in the same order after rollback w.r.t. to messages in the buffer + // For core we could just do a straight rollback, it really depends if we want JMS style semantics or not... + + boolean wasStarted = started; + + if (wasStarted) + { + stop(); + } + + + // We need to make sure we don't get any inflight messages + for (ClientConsumerInternal consumer : cloneConsumers()) + { + consumer.clear(true); + } + + // Acks must be flushed here *after connection is stopped and all onmessages finished executing + flushAcks(); + + sessionContext.simpleRollback(isLastMessageAsDelivered); + + if (wasStarted) + { + start(); + } + + rollbackOnly = false; + } + + public ClientMessage createMessage(final byte type, + final boolean durable, + final long expiration, + final long timestamp, + final byte priority) + { + return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize); + } + + public ClientMessage createMessage(final byte type, final boolean durable) + { + return this.createMessage(type, durable, 0, System.currentTimeMillis(), (byte) 4); + } + + public ClientMessage createMessage(final boolean durable) + { + return this.createMessage((byte) 0, durable); + } + + public boolean isClosed() + { + return closed; + } + + public boolean isAutoCommitSends() + { + return autoCommitSends; + } + + public boolean isAutoCommitAcks() + { + return autoCommitAcks; + } + + public boolean isBlockOnAcknowledge() + { + return blockOnAcknowledge; + } + + public boolean isXA() + { + return xa; + } + + public void resetIfNeeded() throws HornetQException + { + if (rollbackOnly) + { + HornetQClientLogger.LOGGER.resettingSessionAfterFailure(); + rollback(false); + } + } + + public ClientSessionImpl start() throws HornetQException + { + checkClosed(); + + if (!started) + { + for (ClientConsumerInternal clientConsumerInternal : cloneConsumers()) + { + clientConsumerInternal.start(); + } + + sessionContext.sessionStart(); + + started = true; + } + + return this; + } + + public void stop() throws HornetQException + { + stop(true); + } + + public void stop(final boolean waitForOnMessage) throws HornetQException + { + checkClosed(); + + if (started) + { + for (ClientConsumerInternal clientConsumerInternal : cloneConsumers()) + { + clientConsumerInternal.stop(waitForOnMessage); + } + + sessionContext.sessionStop(); + + started = false; + } + } + + public void addFailureListener(final SessionFailureListener listener) + { + sessionFactory.addFailureListener(listener); + } + + public boolean removeFailureListener(final SessionFailureListener listener) + { + return sessionFactory.removeFailureListener(listener); + } + + public void addFailoverListener(FailoverEventListener listener) + { + sessionFactory.addFailoverListener(listener); + } + + public boolean removeFailoverListener(FailoverEventListener listener) + { + return sessionFactory.removeFailoverListener(listener); + } + + public int getVersion() + { + return sessionContext.getServerVersion(); + } + + public boolean isClosing() + { + return inClose; + } + + @Override + public String getNodeId() + { + return sessionFactory.getLiveNodeId(); + } + + // ClientSessionInternal implementation + // ------------------------------------------------------------ + + public int getMinLargeMessageSize() + { + return minLargeMessageSize; + } + + public boolean isCompressLargeMessages() + { + return compressLargeMessages; + } + + /** + * @return the cacheLargeMessageClient + */ + public boolean isCacheLargeMessageClient() + { + return cacheLargeMessageClient; + } + + public String getName() + { + return name; + } + + /** + * Acknowledges all messages received by the consumer so far. + */ + public void acknowledge(final ClientConsumer consumer, final Message message) throws HornetQException + { + // if we're pre-acknowledging then we don't need to do anything + if (preAcknowledge) + { + return; + } + + checkClosed(); + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug("client ack messageID = " + message.getMessageID()); + } + + startCall(); + try + { + sessionContext.sendACK(false, blockOnAcknowledge, consumer, message); + } + finally + { + endCall(); + } + } + + public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws HornetQException + { + // if we're pre-acknowledging then we don't need to do anything + if (preAcknowledge) + { + return; + } + + checkClosed(); + + startCall(); + try + { + + sessionContext.sendACK(true, blockOnAcknowledge, consumer, message); + } + finally + { + endCall(); + } + } + + public void expire(final ClientConsumer consumer, final Message message) throws HornetQException + { + checkClosed(); + + // We don't send expiries for pre-ack since message will already have been acked on server + if (!preAcknowledge) + { + sessionContext.expireMessage(consumer, message); + } + } + + public void addConsumer(final ClientConsumerInternal consumer) + { + synchronized (consumers) + { + consumers.put(consumer.getConsumerContext(), consumer); + } + } + + public void addProducer(final ClientProducerInternal producer) + { + synchronized (producers) + { + producers.add(producer); + } + } + + public void removeConsumer(final ClientConsumerInternal consumer) throws HornetQException + { + synchronized (consumers) + { + consumers.remove(consumer.getConsumerContext()); + } + } + + public void removeProducer(final ClientProducerInternal producer) + { + synchronized (producers) + { + producers.remove(producer); + } + } + + public void handleReceiveMessage(final ConsumerContext consumerID, final ClientMessageInternal message) throws Exception + { + ClientConsumerInternal consumer = getConsumer(consumerID); + + if (consumer != null) + { + consumer.handleMessage(message); + } + } + + public void handleReceiveLargeMessage(final ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception + { + ClientConsumerInternal consumer = getConsumer(consumerID); + + if (consumer != null) + { + consumer.handleLargeMessage(clientLargeMessage, largeMessageSize); + } + } + + public void handleReceiveContinuation(final ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception + { + ClientConsumerInternal consumer = getConsumer(consumerID); + + if (consumer != null) + { + consumer.handleLargeMessageContinuation(chunk, flowControlSize, isContinues); + } + } + + @Override + public void handleConsumerDisconnect(ConsumerContext context) throws HornetQException + { + final ClientConsumerInternal consumer = getConsumer(context); + + if (consumer != null) + { + executor.execute(new Runnable() + { + @Override + public void run() + { + try + { + consumer.close(); + } + catch (HornetQException e) + { + HornetQClientLogger.LOGGER.unableToCloseConsumer(e); + } + } + }); + } + } + + public void close() throws HornetQException + { + if (closed) + { + HornetQClientLogger.LOGGER.debug("Session was already closed, giving up now, this=" + this); + return; + } + + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug("Calling close on session " + this); + } + + try + { + closeChildren(); + + synchronized (this) + { + producerCreditManager.close(); + } + inClose = true; + sessionContext.sessionClose(); + } + catch (Throwable e) + { + // Session close should always return without exception + + // Note - we only log at trace + HornetQClientLogger.LOGGER.trace("Failed to close session", e); + } + + doCleanup(false); + } + + public synchronized void cleanUp(boolean failingOver) throws HornetQException + { + if (closed) + { + return; + } + + producerCreditManager.close(); + + cleanUpChildren(); + + doCleanup(failingOver); + } + + public ClientSessionImpl setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) + { + sessionContext.setSendAcknowledgementHandler(handler); + return this; + } + + public void preHandleFailover(RemotingConnection connection) + { + // We lock the channel to prevent any packets to be added to the re-send + // cache during the failover process + //we also do this before the connection fails over to give the session a chance to block for failover + sessionContext.lockCommunications(); + } + + // Needs to be synchronized to prevent issues with occurring concurrently with close() + + public void handleFailover(final RemotingConnection backupConnection, HornetQException cause) + { + synchronized (this) + { + if (closed) + { + return; + } + + boolean resetCreditManager = false; + + try + { + + // TODO remove this and encapsulate it + + boolean reattached = sessionContext.reattachOnNewConnection(backupConnection); + + if (!reattached) + { + + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug("ClientSession couldn't be reattached, creating a new session"); + } + + for (ClientConsumerInternal consumer : cloneConsumers()) + { + consumer.clearAtFailover(); + } + + + // The session wasn't found on the server - probably we're failing over onto a backup server where the + // session won't exist or the target server has been restarted - in this case the session will need to be + // recreated, + // and we'll need to recreate any consumers + + // It could also be that the server hasn't been restarted, but the session is currently executing close, + // and + // that + // has already been executed on the server, that's why we can't find the session- in this case we *don't* + // want + // to recreate the session, we just want to unblock the blocking call + if (!inClose && mayAttemptToFailover) + { + sessionContext.recreateSession(username, password, + minLargeMessageSize, xa, autoCommitSends, + autoCommitAcks, preAcknowledge, defaultAddress); + + for (Map.Entry entryx : consumers.entrySet()) + { + + ClientConsumerInternal consumerInternal = entryx.getValue(); + + sessionContext.recreateConsumerOnServer(consumerInternal); + } + + if ((!autoCommitAcks || !autoCommitSends) && workDone) + { + // this is protected by a lock, so we can guarantee nothing will sneak here + // while we do our work here + rollbackOnly = true; + } + if (currentXID != null) + { + sessionContext.xaFailed(currentXID); + rollbackOnly = true; + } + + // Now start the session if it was already started + if (started) + { + for (ClientConsumerInternal consumer : cloneConsumers()) + { + consumer.clearAtFailover(); + consumer.start(); + } + + sessionContext.restartSession(); + } + + resetCreditManager = true; + } + + sessionContext.returnBlocking(cause); + } + } + catch (Throwable t) + { + HornetQClientLogger.LOGGER.failedToHandleFailover(t); + } + finally + { + sessionContext.releaseCommunications(); + } + + if (resetCreditManager) + { + producerCreditManager.reset(); + + // Also need to send more credits for consumers, otherwise the system could hand with the server + // not having any credits to send + } + } + + HashMap metaDataToSend; + + synchronized (metadata) + { + metaDataToSend = new HashMap(metadata); + } + + sessionContext.resetMetadata(metaDataToSend); + + + } + + public void addMetaData(String key, String data) throws HornetQException + { + synchronized (metadata) + { + metadata.put(key, data); + } + + sessionContext.addSessionMetadata(key, data); + } + + public void addUniqueMetaData(String key, String data) throws HornetQException + { + sessionContext.addUniqueMetaData(key, data); + } + + public ClientSessionFactory getSessionFactory() + { + return sessionFactory; + } + + public void setAddress(final Message message, final SimpleString address) + { + if (defaultAddress == null) + { + defaultAddress = address; + + message.setAddress(address); + } + else + { + if (!address.equals(defaultAddress)) + { + message.setAddress(address); + } + else + { + message.setAddress(null); + } + } + } + + public void setPacketSize(final int packetSize) + { + if (packetSize > this.initialMessagePacketSize) + { + this.initialMessagePacketSize = (int) (packetSize * 1.2); + } + } + + public void workDone() + { + workDone = true; + } + + public void sendProducerCreditsMessage(final int credits, final SimpleString address) + { + sessionContext.sendProducerCreditsMessage(credits, address); + } + + public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon) + { + ClientProducerCredits credits = producerCreditManager.getCredits(address, anon, sessionContext); + + return credits; + } + + public void returnCredits(final SimpleString address) + { + producerCreditManager.returnCredits(address); + } + + public void handleReceiveProducerCredits(final SimpleString address, final int credits) + { + producerCreditManager.receiveCredits(address, credits); + } + + public void handleReceiveProducerFailCredits(final SimpleString address, int credits) + { + producerCreditManager.receiveFailCredits(address, credits); + } + + public ClientProducerCreditManager getProducerCreditManager() + { + return producerCreditManager; + } + + public void startCall() + { + if (concurrentCall.incrementAndGet() > 1) + { + HornetQClientLogger.LOGGER.invalidConcurrentSessionUsage(new Exception("trace")); + } + } + + public void endCall() + { + concurrentCall.decrementAndGet(); + } + + // CommandConfirmationHandler implementation ------------------------------------ + + // TODO: this will be encapsulated by the SessionContext + + // XAResource implementation + // -------------------------------------------------------------------- + + public void commit(final Xid xid, final boolean onePhase) throws XAException + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("call commit(xid=" + convert(xid)); + } + checkXA(); + + // we should never throw rollback if we have already prepared + if (rollbackOnly) + { + HornetQClientLogger.LOGGER.commitAfterFailover(); + } + + // Note - don't need to flush acks since the previous end would have + // done this + + startCall(); + try + { + sessionContext.xaCommit(xid, onePhase); + workDone = false; + } + catch (XAException xae) + { + throw xae; + } + catch (Throwable t) + { + HornetQClientLogger.LOGGER.failoverDuringCommit(); + + // Any error on commit -> RETRY + // We can't rollback a Prepared TX for definition + XAException xaException = new XAException(XAException.XA_RETRY); + xaException.initCause(t); + throw xaException; + } + finally + { + endCall(); + } + } + + public void end(final Xid xid, final int flags) throws XAException + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("Calling end:: " + convert(xid) + ", flags=" + convertTXFlag(flags)); + } + + checkXA(); + + try + { + if (rollbackOnly) + { + try + { + rollback(); + } + catch (Throwable ignored) + { + HornetQClientLogger.LOGGER.debug("Error on rollback during end call!", ignored); + } + throw new XAException(XAException.XA_RBOTHER); + } + + try + { + flushAcks(); + + startCall(); + try + { + sessionContext.xaEnd(xid, flags); + } + finally + { + endCall(); + } + } + catch (XAException xae) + { + throw xae; + } + catch (Throwable t) + { + HornetQClientLogger.LOGGER.errorCallingEnd(t); + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + } + finally + { + currentXID = null; + } + } + + public void forget(final Xid xid) throws XAException + { + checkXA(); + startCall(); + try + { + sessionContext.xaForget(xid); + } + catch (XAException xae) + { + throw xae; + } + catch (Throwable t) + { + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + finally + { + endCall(); + } + } + + public int getTransactionTimeout() throws XAException + { + checkXA(); + + try + { + return sessionContext.recoverSessionTimeout(); + } + catch (Throwable t) + { + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + } + + public boolean setTransactionTimeout(final int seconds) throws XAException + { + checkXA(); + + try + { + return sessionContext.configureTransactionTimeout(seconds); + } + catch (Throwable t) + { + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + } + + public boolean isSameRM(final XAResource xares) throws XAException + { + checkXA(); + + if (forceNotSameRM) + { + return false; + } + + ClientSessionInternal other = getSessionInternalFromXAResource(xares); + + if (other == null) + { + return false; + } + + String liveNodeId = sessionFactory.getLiveNodeId(); + String otherLiveNodeId = ((ClientSessionFactoryInternal) other.getSessionFactory()).getLiveNodeId(); + + if (liveNodeId != null && otherLiveNodeId != null) + { + return liveNodeId.equals(otherLiveNodeId); + } + + //we shouldn't get here, live node id should always be set + return sessionFactory == other.getSessionFactory(); + } + + private ClientSessionInternal getSessionInternalFromXAResource(final XAResource xares) + { + if (xares == null) + { + return null; + } + if (xares instanceof ClientSessionInternal) + { + return (ClientSessionInternal) xares; + } + else if (xares instanceof HornetQXAResource) + { + return getSessionInternalFromXAResource(((HornetQXAResource)xares).getResource()); + } + return null; + } + + public int prepare(final Xid xid) throws XAException + { + checkXA(); + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("Calling prepare:: " + convert(xid)); + } + + + if (rollbackOnly) + { + throw new XAException(XAException.XA_RBOTHER); + } + + // Note - don't need to flush acks since the previous end would have + // done this + + startCall(); + try + { + return sessionContext.xaPrepare(xid); + } + catch (XAException xae) + { + throw xae; + } + catch (HornetQException e) + { + if (e.getType() == HornetQExceptionType.UNBLOCKED) + { + // Unblocked on failover + try + { + // will retry once after failover & unblock + return sessionContext.xaPrepare(xid); + } + catch (Throwable t) + { + // ignore and rollback + } + HornetQClientLogger.LOGGER.failoverDuringPrepareRollingBack(); + try + { + rollback(false); + } + catch (Throwable t) + { + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + + HornetQClientLogger.LOGGER.errorDuringPrepare(e); + + throw new XAException(XAException.XA_RBOTHER); + } + + HornetQClientLogger.LOGGER.errorDuringPrepare(e); + + // This should never occur + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(e); + throw xaException; + } + catch (Throwable t) + { + HornetQClientLogger.LOGGER.errorDuringPrepare(t); + + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + finally + { + endCall(); + } + } + + public Xid[] recover(final int flags) throws XAException + { + checkXA(); + + if ((flags & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) + { + try + { + return sessionContext.xaScan(); + } + catch (Throwable t) + { + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + } + + return new Xid[0]; + } + + public void rollback(final Xid xid) throws XAException + { + checkXA(); + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("Calling rollback:: " + convert(xid)); + } + + try + { + boolean wasStarted = started; + + if (wasStarted) + { + stop(false); + } + + // We need to make sure we don't get any inflight messages + for (ClientConsumerInternal consumer : cloneConsumers()) + { + consumer.clear(false); + } + + flushAcks(); + + try + { + sessionContext.xaRollback(xid, wasStarted); + } + finally + { + if (wasStarted) + { + start(); + } + } + + workDone = false; + } + catch (XAException xae) + { + throw xae; + } + catch (HornetQException e) + { + if (e.getType() == HornetQExceptionType.UNBLOCKED) + { + // Unblocked on failover + throw new XAException(XAException.XA_RETRY); + } + + // This should never occur + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(e); + throw xaException; + } + catch (Throwable t) + { + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + } + + public void start(final Xid xid, final int flags) throws XAException + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("Calling start:: " + convert(xid) + " clientXID=" + xid + " flags = " + convertTXFlag(flags)); + } + + checkXA(); + + try + { + sessionContext.xaStart(xid, flags); + + this.currentXID = xid; + } + catch (XAException xae) + { + throw xae; + } + catch (HornetQException e) + { + // we can retry this only because we know for sure that no work would have been done + if (e.getType() == HornetQExceptionType.UNBLOCKED) + { + try + { + sessionContext.xaStart(xid, flags); + } + catch (XAException xae) + { + throw xae; + } + catch (Throwable t) + { + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + } + + // This should never occur + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(e); + throw xaException; + } + catch (Throwable t) + { + // This could occur if the TM interrupts the thread + XAException xaException = new XAException(XAException.XAER_RMERR); + xaException.initCause(t); + throw xaException; + } + } + + // FailureListener implementation -------------------------------------------- + + public void connectionFailed(final HornetQException me, boolean failedOver) + { + try + { + cleanUp(false); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.failedToCleanupSession(e); + } + } + + public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID) + { + connectionFailed(me, failedOver); + } + + // Public + // ---------------------------------------------------------------------------- + + public void setForceNotSameRM(final boolean force) + { + forceNotSameRM = force; + } + + public RemotingConnection getConnection() + { + return sessionContext.getRemotingConnection(); + } + + @Override + public String toString() + { + StringBuilder buffer = new StringBuilder(); + synchronized (metadata) + { + for (Map.Entry entry : metadata.entrySet()) + { + buffer.append(entry.getKey() + "=" + entry.getValue() + ","); + } + } + + return "ClientSessionImpl [name=" + name + + ", username=" + + username + + ", closed=" + + closed + + ", factory = " + this.sessionFactory + + ", metaData=(" + + buffer + + ")]@" + + Integer.toHexString(hashCode()); + } + + /** + * @param queueName + * @param filterString + * @param windowSize + * @param browseOnly + * @return + * @throws HornetQException + */ + private ClientConsumer internalCreateConsumer(final SimpleString queueName, + final SimpleString filterString, + final int windowSize, + final int maxRate, + final boolean browseOnly) throws HornetQException + { + checkClosed(); + + ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, windowSize, maxRate, + ackBatchSize, browseOnly, executor, flowControlExecutor); + + addConsumer(consumer); + + // Now we send window size credits to start the consumption + // We even send it if windowSize == -1, since we need to start the + // consumer + + // TODO: this could semantically change on other servers. I know for instance on stomp this is just an ignore + if (windowSize != 0) + { + sessionContext.sendConsumerCredits(consumer, windowSize); + } + + return consumer; + } + + private ClientProducer internalCreateProducer(final SimpleString address, final int maxRate) throws HornetQException + { + checkClosed(); + + ClientProducerInternal producer = new ClientProducerImpl(this, + address, + maxRate == -1 ? null + : new TokenBucketLimiterImpl(maxRate, false), + autoCommitSends && blockOnNonDurableSend, + autoCommitSends && blockOnDurableSend, + autoGroup, + groupID == null ? null : new SimpleString(groupID), + minLargeMessageSize, + sessionContext); + + addProducer(producer); + + return producer; + } + + private void internalCreateQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean temp) throws HornetQException + { + checkClosed(); + + if (durable && temp) + { + throw HornetQClientMessageBundle.BUNDLE.queueMisConfigured(); + } + + startCall(); + try + { + sessionContext.createQueue(address, queueName, filterString, durable, temp); + } + finally + { + endCall(); + } + } + + private void checkXA() throws XAException + { + if (!xa) + { + HornetQClientLogger.LOGGER.sessionNotXA(); + throw new XAException(XAException.XAER_RMERR); + } + } + + private void checkClosed() throws HornetQException + { + if (closed || inClose) + { + throw HornetQClientMessageBundle.BUNDLE.sessionClosed(); + } + } + + private ClientConsumerInternal getConsumer(final ConsumerContext consumerContext) + { + synchronized (consumers) + { + ClientConsumerInternal consumer = consumers.get(consumerContext); + return consumer; + } + } + + private void doCleanup(boolean failingOver) + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug("calling cleanup on " + this); + } + + synchronized (this) + { + closed = true; + + sessionContext.cleanup(); + } + + sessionFactory.removeSession(this, failingOver); + } + + private void cleanUpChildren() throws HornetQException + { + Set consumersClone = cloneConsumers(); + + for (ClientConsumerInternal consumer : consumersClone) + { + consumer.cleanUp(); + } + + Set producersClone = cloneProducers(); + + for (ClientProducerInternal producer : producersClone) + { + producer.cleanUp(); + } + } + + /** + * Not part of the interface, used on tests only + * + * @return + */ + public Set cloneProducers() + { + Set producersClone; + + synchronized (producers) + { + producersClone = new HashSet(producers); + } + return producersClone; + } + + /** + * Not part of the interface, used on tests only + * + * @return + */ + public Set cloneConsumers() + { + synchronized (consumers) + { + return new HashSet(consumers.values()); + } + } + + private void closeChildren() throws HornetQException + { + Set consumersClone = cloneConsumers(); + + for (ClientConsumer consumer : consumersClone) + { + consumer.close(); + } + + Set producersClone = cloneProducers(); + + for (ClientProducer producer : producersClone) + { + producer.close(); + } + } + + private void flushAcks() throws HornetQException + { + for (ClientConsumerInternal consumer : cloneConsumers()) + { + consumer.flushAcks(); + } + } + + /** + * If you ever tried to debug XIDs you will know what this is about. + * This will serialize and deserialize the XID to the same way it's going to be printed on server logs + * or print-data. + *

+ * This will convert to the same XID deserialized on the Server, hence we will be able to debug eventual stuff + * + * @param xid + * @return + */ + public static Object convert(Xid xid) + { + HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(200); + XidCodecSupport.encodeXid(xid, buffer); + + Object obj = XidCodecSupport.decodeXid(buffer); + + return "xid=" + obj + ",clientXID=" + xid; + } + + private String convertTXFlag(final int flags) + { + if (flags == XAResource.TMSUSPEND) + { + return "SESS_XA_SUSPEND"; + } + else if (flags == XAResource.TMSUCCESS) + { + return "TMSUCCESS"; + } + else if (flags == XAResource.TMFAIL) + { + return "TMFAIL"; + } + else if (flags == XAResource.TMJOIN) + { + return "TMJOIN"; + } + else if (flags == XAResource.TMRESUME) + { + return "TMRESUME"; + } + else if (flags == XAResource.TMNOFLAGS) + { + // Don't need to flush since the previous end will have done this + return "TMNOFLAGS"; + } + else + { + return "XAER_INVAL(" + flags + ")"; + } + } + + @Override + public void setStopSignal() + { + mayAttemptToFailover = false; + } + + @Override + public boolean isConfirmationWindowEnabled() + { + if (confirmationWindowWarning.disabled) + { + if (!confirmationWindowWarning.warningIssued.get()) + { + HornetQClientLogger.LOGGER.confirmationWindowDisabledWarning(); + confirmationWindowWarning.warningIssued.set(true); + } + return false; + } + return true; + } + + @Override + public void scheduleConfirmation(final SendAcknowledgementHandler handler, final Message message) + { + executor.execute(new Runnable() + { + @Override + public void run() + { + handler.sendAcknowledged(message); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionInternal.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionInternal.java new file mode 100644 index 0000000..2fa6609 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionInternal.java @@ -0,0 +1,121 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientConsumer; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.spi.core.remoting.ConsumerContext; + +/** + * A ClientSessionInternal + * + * @author Tim Fox + */ +public interface ClientSessionInternal extends ClientSession +{ + String getName(); + + void acknowledge(ClientConsumer consumer, Message message) throws HornetQException; + + void individualAcknowledge(final ClientConsumer consumer, final Message message) throws HornetQException; + + boolean isCacheLargeMessageClient(); + + int getMinLargeMessageSize(); + + boolean isCompressLargeMessages(); + + void expire(ClientConsumer consumer, Message message) throws HornetQException; + + void addConsumer(ClientConsumerInternal consumer); + + void addProducer(ClientProducerInternal producer); + + void removeConsumer(ClientConsumerInternal consumer) throws HornetQException; + + void removeProducer(ClientProducerInternal producer); + + void handleReceiveMessage(ConsumerContext consumerID, ClientMessageInternal message) throws Exception; + + void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception; + + void handleReceiveContinuation(ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception; + + void handleConsumerDisconnect(ConsumerContext consumerContext) throws HornetQException; + + void preHandleFailover(RemotingConnection connection); + + void handleFailover(RemotingConnection backupConnection, HornetQException cause); + + RemotingConnection getConnection(); + + void cleanUp(boolean failingOver) throws HornetQException; + + void setForceNotSameRM(boolean force); + + void workDone(); + + void sendProducerCreditsMessage(int credits, SimpleString address); + + ClientProducerCredits getCredits(SimpleString address, boolean anon); + + void returnCredits(SimpleString address); + + void handleReceiveProducerCredits(SimpleString address, int credits); + + void handleReceiveProducerFailCredits(SimpleString address, int credits); + + ClientProducerCreditManager getProducerCreditManager(); + + void setAddress(Message message, SimpleString address); + + void setPacketSize(int packetSize); + + void resetIfNeeded() throws HornetQException; + + /** + * This is used internally to control and educate the user + * about using the thread boundaries properly. + * if more than one thread is using the session simultaneously + * this will generate a big warning on the docs. + * There are a limited number of places where we can call this such as acks and sends. otherwise we + * could get false warns + */ + void startCall(); + + /** + * @see #startCall() + */ + void endCall(); + + /** + * Sets a stop signal to true. This will cancel + */ + void setStopSignal(); + + boolean isConfirmationWindowEnabled(); + + /** + * @param handler + */ + void scheduleConfirmation(SendAcknowledgementHandler handler, Message message); + + boolean isClosing(); + + String getNodeId(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/CompressedLargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/CompressedLargeMessageControllerImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/CompressedLargeMessageControllerImpl.java new file mode 100644 index 0000000..63d9a7b --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/CompressedLargeMessageControllerImpl.java @@ -0,0 +1,769 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.io.DataInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import io.netty.buffer.ByteBuf; +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.utils.DataConstants; +import org.apache.activemq6.utils.HornetQBufferInputStream; +import org.apache.activemq6.utils.InflaterReader; +import org.apache.activemq6.utils.InflaterWriter; +import org.apache.activemq6.utils.UTF8Util; + +/** + * @author Clebert Suconic + */ +final class CompressedLargeMessageControllerImpl implements LargeMessageController +{ + private static final String OPERATION_NOT_SUPPORTED = "Operation not supported"; + + private final LargeMessageController bufferDelegate; + + public CompressedLargeMessageControllerImpl(final LargeMessageController bufferDelegate) + { + this.bufferDelegate = bufferDelegate; + } + + /** + * + */ + public void discardUnusedPackets() + { + bufferDelegate.discardUnusedPackets(); + } + + /** + * Add a buff to the List, or save it to the OutputStream if set + */ + public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) + { + bufferDelegate.addPacket(chunk, flowControlSize, isContinues); + } + + public synchronized void cancel() + { + bufferDelegate.cancel(); + } + + public synchronized void close() + { + bufferDelegate.cancel(); + } + + public void setOutputStream(final OutputStream output) throws HornetQException + { + bufferDelegate.setOutputStream(new InflaterWriter(output)); + } + + public synchronized void saveBuffer(final OutputStream output) throws HornetQException + { + setOutputStream(output); + waitCompletion(0); + } + + /** + * @param timeWait Milliseconds to Wait. 0 means forever + */ + public synchronized boolean waitCompletion(final long timeWait) throws HornetQException + { + return bufferDelegate.waitCompletion(timeWait); + } + + @Override + public int capacity() + { + return -1; + } + + DataInputStream dataInput = null; + + private DataInputStream getStream() + { + if (dataInput == null) + { + try + { + InputStream input = new HornetQBufferInputStream(bufferDelegate); + + dataInput = new DataInputStream(new InflaterReader(input)); + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage(), e); + } + + } + return dataInput; + } + + private void positioningNotSupported() + { + throw new IllegalStateException("Position not supported over compressed large messages"); + } + + public byte readByte() + { + try + { + return getStream().readByte(); + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public byte getByte(final int index) + { + positioningNotSupported(); + return 0; + } + + @Override + public void getBytes(final int index, final HornetQBuffer dst, final int dstIndex, final int length) + { + positioningNotSupported(); + } + + @Override + public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length) + { + positioningNotSupported(); + } + + @Override + public void getBytes(final int index, final ByteBuffer dst) + { + positioningNotSupported(); + } + + @Override + public int getInt(final int index) + { + positioningNotSupported(); + return 0; + } + + @Override + public long getLong(final int index) + { + positioningNotSupported(); + return 0; + } + + @Override + public short getShort(final int index) + { + positioningNotSupported(); + return 0; + } + + @Override + public void setByte(final int index, final byte value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void setBytes(final int index, final HornetQBuffer src, final int srcIndex, final int length) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void setBytes(final int index, final byte[] src, final int srcIndex, final int length) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void setBytes(final int index, final ByteBuffer src) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void setInt(final int index, final int value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void setLong(final int index, final long value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void setShort(final int index, final short value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public ByteBuffer toByteBuffer(final int index, final int length) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public int readerIndex() + { + return 0; + } + + public void readerIndex(final int readerIndex) + { + // TODO + } + + public int writerIndex() + { + // TODO + return 0; + } + + public long getSize() + { + return this.bufferDelegate.getSize(); + } + + public void writerIndex(final int writerIndex) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void setIndex(final int readerIndex, final int writerIndex) + { + positioningNotSupported(); + } + + public void clear() + { + } + + public boolean readable() + { + return true; + } + + public boolean writable() + { + return false; + } + + public int readableBytes() + { + return 1; + } + + public int writableBytes() + { + return 0; + } + + public void markReaderIndex() + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void resetReaderIndex() + { + // TODO: reset positioning if possible + } + + public void markWriterIndex() + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void resetWriterIndex() + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void discardReadBytes() + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public short getUnsignedByte(final int index) + { + return (short) (getByte(index) & 0xFF); + } + + public int getUnsignedShort(final int index) + { + return getShort(index) & 0xFFFF; + } + + public long getUnsignedInt(final int index) + { + return getInt(index) & 0xFFFFFFFFL; + } + + public void getBytes(int index, final byte[] dst) + { + // TODO: optimize this by using System.arraycopy + for (int i = 0; i < dst.length; i++) + { + dst[i] = getByte(index++); + } + } + + public void getBytes(final int index, final HornetQBuffer dst) + { + getBytes(index, dst, dst.writableBytes()); + } + + public void getBytes(final int index, final HornetQBuffer dst, final int length) + { + if (length > dst.writableBytes()) + { + throw new IndexOutOfBoundsException(); + } + getBytes(index, dst, dst.writerIndex(), length); + dst.writerIndex(dst.writerIndex() + length); + } + + public void setBytes(final int index, final byte[] src) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void setBytes(final int index, final HornetQBuffer src) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void setBytes(final int index, final HornetQBuffer src, final int length) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public short readUnsignedByte() + { + try + { + return (short) getStream().readUnsignedByte(); + } + catch (Exception e) + { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public short readShort() + { + try + { + return getStream().readShort(); + } + catch (Exception e) + { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public int readUnsignedShort() + { + try + { + return getStream().readUnsignedShort(); + } + catch (Exception e) + { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public int readInt() + { + try + { + return getStream().readInt(); + } + catch (Exception e) + { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public long readUnsignedInt() + { + return readInt() & 0xFFFFFFFFL; + } + + public long readLong() + { + try + { + return getStream().readLong(); + } + catch (Exception e) + { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public void readBytes(final byte[] dst, final int dstIndex, final int length) + { + try + { + int nReadBytes = getStream().read(dst, dstIndex, length); + if (nReadBytes < length) + { + HornetQClientLogger.LOGGER.compressedLargeMessageError(length, nReadBytes); + } + } + catch (Exception e) + { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public void readBytes(final byte[] dst) + { + readBytes(dst, 0, dst.length); + } + + public void readBytes(final HornetQBuffer dst) + { + readBytes(dst, dst.writableBytes()); + } + + public void readBytes(final HornetQBuffer dst, final int length) + { + if (length > dst.writableBytes()) + { + throw new IndexOutOfBoundsException(); + } + readBytes(dst, dst.writerIndex(), length); + dst.writerIndex(dst.writerIndex() + length); + } + + public void readBytes(final HornetQBuffer dst, final int dstIndex, final int length) + { + byte[] destBytes = new byte[length]; + readBytes(destBytes); + dst.setBytes(dstIndex, destBytes); + } + + public void readBytes(final ByteBuffer dst) + { + byte[] bytesToGet = new byte[dst.remaining()]; + readBytes(bytesToGet); + dst.put(bytesToGet); + } + + public void skipBytes(final int length) + { + + try + { + for (int i = 0; i < length; i++) + { + getStream().read(); + } + } + catch (Exception e) + { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public void writeByte(final byte value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void writeShort(final short value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void writeInt(final int value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void writeLong(final long value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void writeBytes(final byte[] src, final int srcIndex, final int length) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void writeBytes(final byte[] src) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void writeBytes(final HornetQBuffer src, final int length) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void writeBytes(final ByteBuffer src) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public ByteBuffer toByteBuffer() + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public Object getUnderlyingBuffer() + { + return this; + } + + @Override + public boolean readBoolean() + { + return readByte() != 0; + } + + @Override + public char readChar() + { + return (char) readShort(); + } + + public char getChar(final int index) + { + return (char) getShort(index); + } + + public double getDouble(final int index) + { + return Double.longBitsToDouble(getLong(index)); + } + + public float getFloat(final int index) + { + return Float.intBitsToFloat(getInt(index)); + } + + public HornetQBuffer readBytes(final int length) + { + byte[] bytesToGet = new byte[length]; + readBytes(bytesToGet); + return HornetQBuffers.wrappedBuffer(bytesToGet); + } + + @Override + public double readDouble() + { + return Double.longBitsToDouble(readLong()); + } + + @Override + public float readFloat() + { + return Float.intBitsToFloat(readInt()); + } + + @Override + public SimpleString readNullableSimpleString() + { + int b = readByte(); + if (b == DataConstants.NULL) + { + return null; + } + else + { + return readSimpleString(); + } + } + + @Override + public String readNullableString() + { + int b = readByte(); + if (b == DataConstants.NULL) + { + return null; + } + else + { + return readString(); + } + } + + @Override + public SimpleString readSimpleString() + { + int len = readInt(); + byte[] data = new byte[len]; + readBytes(data); + return new SimpleString(data); + } + + @Override + public String readString() + { + int len = readInt(); + + if (len < 9) + { + char[] chars = new char[len]; + for (int i = 0; i < len; i++) + { + chars[i] = (char) readShort(); + } + return new String(chars); + } + else if (len < 0xfff) + { + return readUTF(); + } + else + { + return readSimpleString().toString(); + } + } + + @Override + public String readUTF() + { + return UTF8Util.readUTF(this); + } + + @Override + public void writeBoolean(final boolean val) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void writeChar(final char val) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void writeDouble(final double val) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + + } + + @Override + public void writeFloat(final float val) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + + } + + @Override + public void writeNullableSimpleString(final SimpleString val) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void writeNullableString(final String val) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void writeSimpleString(final SimpleString val) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void writeString(final String val) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override + public void writeUTF(final String utf) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public HornetQBuffer copy() + { + throw new UnsupportedOperationException(); + } + + public HornetQBuffer slice(final int index, final int length) + { + throw new UnsupportedOperationException(); + } + + // Inner classes ------------------------------------------------- + + public ByteBuf byteBuf() + { + return null; + } + + public HornetQBuffer copy(final int index, final int length) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public HornetQBuffer duplicate() + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public HornetQBuffer readSlice(final int length) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void setChar(final int index, final char value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void setDouble(final int index, final double value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void setFloat(final int index, final float value) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public HornetQBuffer slice() + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length) + { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } +}