Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 81D74B3A for ; Wed, 27 Apr 2011 17:35:04 +0000 (UTC) Received: (qmail 49784 invoked by uid 500); 27 Apr 2011 17:35:04 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 49757 invoked by uid 500); 27 Apr 2011 17:35:04 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 49734 invoked by uid 99); 27 Apr 2011 17:35:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2011 17:35:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2011 17:35:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 898F62388CB5; Wed, 27 Apr 2011 17:33:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1097189 [42/42] - in /activemq/activemq-apollo/trunk: ./ apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/ apollo-openwire/src/main/resources/ apollo-openwire/src/main/resources/META-INF/ apollo-openwire/src/main/resources/ME... Date: Wed, 27 Apr 2011 17:33:09 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110427173319.898F62388CB5@eris.apache.org> Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitorAdapter.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitorAdapter.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitorAdapter.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitorAdapter.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.state; + +import org.apache.activemq.apollo.openwire.command.BrokerInfo; +import org.apache.activemq.apollo.openwire.command.ConnectionControl; +import org.apache.activemq.apollo.openwire.command.ConnectionError; +import org.apache.activemq.apollo.openwire.command.ConnectionId; +import org.apache.activemq.apollo.openwire.command.ConnectionInfo; +import org.apache.activemq.apollo.openwire.command.ConsumerControl; +import org.apache.activemq.apollo.openwire.command.ConsumerId; +import org.apache.activemq.apollo.openwire.command.ConsumerInfo; +import org.apache.activemq.apollo.openwire.command.ControlCommand; +import org.apache.activemq.apollo.openwire.command.DestinationInfo; +import org.apache.activemq.apollo.openwire.command.FlushCommand; +import org.apache.activemq.apollo.openwire.command.KeepAliveInfo; +import org.apache.activemq.apollo.openwire.command.Message; +import org.apache.activemq.apollo.openwire.command.MessageAck; +import org.apache.activemq.apollo.openwire.command.MessageDispatch; +import org.apache.activemq.apollo.openwire.command.MessageDispatchNotification; +import org.apache.activemq.apollo.openwire.command.MessagePull; +import org.apache.activemq.apollo.openwire.command.ProducerAck; +import org.apache.activemq.apollo.openwire.command.ProducerId; +import org.apache.activemq.apollo.openwire.command.ProducerInfo; +import org.apache.activemq.apollo.openwire.command.RemoveInfo; +import org.apache.activemq.apollo.openwire.command.RemoveSubscriptionInfo; +import org.apache.activemq.apollo.openwire.command.Response; +import org.apache.activemq.apollo.openwire.command.SessionId; +import org.apache.activemq.apollo.openwire.command.SessionInfo; +import org.apache.activemq.apollo.openwire.command.ShutdownInfo; +import org.apache.activemq.apollo.openwire.command.TransactionInfo; +import org.apache.activemq.apollo.openwire.command.WireFormatInfo; + +public class CommandVisitorAdapter implements CommandVisitor { + + public Response processAddConnection(ConnectionInfo info) throws Exception { + return null; + } + + public Response processAddConsumer(ConsumerInfo info) throws Exception { + return null; + } + + public Response processAddDestination(DestinationInfo info) throws Exception { + return null; + } + + public Response processAddProducer(ProducerInfo info) throws Exception { + return null; + } + + public Response processAddSession(SessionInfo info) throws Exception { + return null; + } + + public Response processBeginTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processBrokerInfo(BrokerInfo info) throws Exception { + return null; + } + + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + return null; + } + + public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + return null; + } + + public Response processEndTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processFlush(FlushCommand command) throws Exception { + return null; + } + + public Response processForgetTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processKeepAlive(KeepAliveInfo info) throws Exception { + return null; + } + + public Response processMessage(Message send) throws Exception { + return null; + } + + public Response processMessageAck(MessageAck ack) throws Exception { + return null; + } + + public Response processMessageDispatchNotification(MessageDispatchNotification notification) + throws Exception { + return null; + } + + public Response processMessagePull(MessagePull pull) throws Exception { + return null; + } + + public Response processPrepareTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processProducerAck(ProducerAck ack) throws Exception { + return null; + } + + public Response processRecoverTransactions(TransactionInfo info) throws Exception { + return null; + } + + public Response processRemoveConnection(RemoveInfo info, ConnectionId id, long lastDeliveredSequenceId) throws Exception { + return null; + } + + public Response processRemoveConsumer(RemoveInfo info, ConsumerId id, long lastDeliveredSequenceId) throws Exception { + return null; + } + + public Response processRemoveDestination(DestinationInfo info) throws Exception { + return null; + } + + public Response processRemoveProducer(RemoveInfo info, ProducerId id) throws Exception { + return null; + } + + public Response processRemoveSession(RemoveInfo info, SessionId id, long lastDeliveredSequenceId) throws Exception { + return null; + } + + public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { + return null; + } + + public Response processRollbackTransaction(TransactionInfo info) throws Exception { + return null; + } + + public Response processShutdown(ShutdownInfo info) throws Exception { + return null; + } + + public Response processWireFormat(WireFormatInfo info) throws Exception { + return null; + } + + public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { + return null; + } + + public Response processControlCommand(ControlCommand command) throws Exception { + return null; + } + + public Response processConnectionControl(ConnectionControl control) throws Exception { + return null; + } + + public Response processConnectionError(ConnectionError error) throws Exception { + return null; + } + + public Response processConsumerControl(ConsumerControl control) throws Exception { + return null; + } + +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.openwire.support.state; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.apollo.openwire.command.ActiveMQDestination; +import org.apache.activemq.apollo.openwire.command.ConnectionInfo; +import org.apache.activemq.apollo.openwire.command.DestinationInfo; +import org.apache.activemq.apollo.openwire.command.SessionId; +import org.apache.activemq.apollo.openwire.command.SessionInfo; +import org.apache.activemq.apollo.openwire.command.TransactionId; + +public class ConnectionState { + + ConnectionInfo info; + private final ConcurrentHashMap transactions = new ConcurrentHashMap(); + private final ConcurrentHashMap sessions = new ConcurrentHashMap(); + private final List tempDestinations = Collections.synchronizedList(new ArrayList()); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + + public ConnectionState(ConnectionInfo info) { + this.info = info; + // Add the default session id. + addSession(new SessionInfo(info, -1)); + } + + public String toString() { + return info.toString(); + } + + public void reset(ConnectionInfo info) { + this.info = info; + transactions.clear(); + sessions.clear(); + tempDestinations.clear(); + shutdown.set(false); + // Add the default session id. + addSession(new SessionInfo(info, -1)); + } + + public void addTempDestination(DestinationInfo info) { + checkShutdown(); + tempDestinations.add(info); + } + + public void removeTempDestination(ActiveMQDestination destination) { + for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) { + DestinationInfo di = iter.next(); + if (di.getDestination().equals(destination)) { + iter.remove(); + } + } + } + + public void addTransactionState(TransactionId id) { + checkShutdown(); + transactions.put(id, new TransactionState(id)); + } + + public TransactionState getTransactionState(TransactionId id) { + return transactions.get(id); + } + + public Collection getTransactionStates() { + return transactions.values(); + } + + public TransactionState removeTransactionState(TransactionId id) { + return transactions.remove(id); + } + + public void addSession(SessionInfo info) { + checkShutdown(); + sessions.put(info.getSessionId(), new SessionState(info)); + } + + public SessionState removeSession(SessionId id) { + return sessions.remove(id); + } + + public SessionState getSessionState(SessionId id) { + return sessions.get(id); + } + + public ConnectionInfo getInfo() { + return info; + } + + public Set getSessionIds() { + return sessions.keySet(); + } + + public List getTempDesinations() { + return tempDestinations; + } + + public Collection getSessionStates() { + return sessions.values(); + } + + private void checkShutdown() { + if (shutdown.get()) { + throw new IllegalStateException("Disposed"); + } + } + + public void shutdown() { + if (shutdown.compareAndSet(false, true)) { + for (Iterator iter = sessions.values().iterator(); iter.hasNext();) { + SessionState ss = iter.next(); + ss.shutdown(); + } + } + } +} Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,526 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.state; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.apollo.util.IOExceptionSupport; +import org.apache.activemq.apollo.util.VoidFunction1; +import org.apache.activemq.apollo.openwire.command.Command; +import org.apache.activemq.apollo.openwire.command.ConnectionId; +import org.apache.activemq.apollo.openwire.command.ConnectionInfo; +import org.apache.activemq.apollo.openwire.command.ConsumerId; +import org.apache.activemq.apollo.openwire.command.ConsumerInfo; +import org.apache.activemq.apollo.openwire.command.DestinationInfo; +import org.apache.activemq.apollo.openwire.command.Message; +import org.apache.activemq.apollo.openwire.command.MessageId; +import org.apache.activemq.apollo.openwire.command.ProducerId; +import org.apache.activemq.apollo.openwire.command.ProducerInfo; +import org.apache.activemq.apollo.openwire.command.Response; +import org.apache.activemq.apollo.openwire.command.SessionId; +import org.apache.activemq.apollo.openwire.command.SessionInfo; +import org.apache.activemq.apollo.openwire.command.TransactionInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Tracks the state of a connection so a newly established transport can be + * re-initialized to the state that was tracked. + * + */ +public class ConnectionStateTracker extends CommandVisitorAdapter { + private static final Log LOG = LogFactory.getLog(ConnectionStateTracker.class); + + private static final org.apache.activemq.apollo.openwire.support.state.Tracked TRACKED_RESPONSE_MARKER = new org.apache.activemq.apollo.openwire.support.state.Tracked(null); + + protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); + + private boolean trackTransactions; + private boolean restoreSessions = true; + private boolean restoreConsumers = true; + private boolean restoreProducers = true; + private boolean restoreTransaction = true; + private boolean trackMessages = true; + private int maxCacheSize = 128 * 1024; + private int currentCacheSize; + private Map messageCache = new LinkedHashMap(){ + protected boolean removeEldestEntry(Map.Entry eldest) { + boolean result = currentCacheSize > maxCacheSize; + if (result) { + currentCacheSize -= eldest.getValue().getSize(); + } + return result; + } + }; + + + private class RemoveTransactionAction implements Runnable { + private final TransactionInfo info; + + public RemoveTransactionAction(TransactionInfo info) { + this.info = info; + } + + public void run() { + ConnectionId connectionId = info.getConnectionId(); + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + cs.removeTransactionState(info.getTransactionId()); + } + } + + /** + * + * + * @param command + * @return null if the command is not state tracked. + * @throws IOException + */ + public org.apache.activemq.apollo.openwire.support.state.Tracked track(Command command) throws IOException { + try { + return (org.apache.activemq.apollo.openwire.support.state.Tracked)command.visit(this); + } catch (IOException e) { + throw e; + } catch (Throwable e) { + throw IOExceptionSupport.create(e); + } + } + + public void trackBack(Command command) { + if (trackMessages && command != null && command.isMessage()) { + Message message = (Message) command; + if (message.getTransactionId()==null) { + currentCacheSize = currentCacheSize + message.getSize(); + } + } + } + + public void restore(VoidFunction1 transport) throws IOException { + // Restore the connections. + for (Iterator iter = connectionStates.values().iterator(); iter.hasNext();) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState connectionState = iter.next(); + transport.apply(connectionState.getInfo()); + restoreTempDestinations(transport, connectionState); + + if (restoreSessions) { + restoreSessions(transport, connectionState); + } + + if (restoreTransaction) { + restoreTransactions(transport, connectionState); + } + } + //now flush messages + for (Message msg:messageCache.values()) { + transport.apply(msg); + } + } + + private void restoreTransactions(VoidFunction1 transport, org.apache.activemq.apollo.openwire.support.state.ConnectionState connectionState) throws IOException { + for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) { + TransactionState transactionState = (TransactionState)iter.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("tx: " + transactionState.getId()); + } + for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) { + Command command = (Command)iterator.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("tx replay: " + command); + } + transport.apply(command); + } + } + } + + /** + * @param transport + * @param connectionState + * @throws IOException + */ + protected void restoreSessions(VoidFunction1 transport, org.apache.activemq.apollo.openwire.support.state.ConnectionState connectionState) throws IOException { + // Restore the connection's sessions + for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { + SessionState sessionState = (SessionState)iter2.next(); + transport.apply(sessionState.getInfo()); + + if (restoreProducers) { + restoreProducers(transport, sessionState); + } + + if (restoreConsumers) { + restoreConsumers(transport, sessionState); + } + } + } + + /** + * @param transport + * @param sessionState + * @throws IOException + */ + protected void restoreConsumers(VoidFunction1 transport, SessionState sessionState) throws IOException { + // Restore the session's consumers + for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) { + org.apache.activemq.apollo.openwire.support.state.ConsumerState consumerState = (org.apache.activemq.apollo.openwire.support.state.ConsumerState)iter3.next(); + transport.apply(consumerState.getInfo()); + } + } + + /** + * @param transport + * @param sessionState + * @throws IOException + */ + protected void restoreProducers(VoidFunction1 transport, SessionState sessionState) throws IOException { + // Restore the session's producers + for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { + org.apache.activemq.apollo.openwire.support.state.ProducerState producerState = (org.apache.activemq.apollo.openwire.support.state.ProducerState)iter3.next(); + transport.apply(producerState.getInfo()); + } + } + + /** + * @param transport + * @param connectionState + * @throws IOException + */ + protected void restoreTempDestinations(VoidFunction1 transport, org.apache.activemq.apollo.openwire.support.state.ConnectionState connectionState) + throws IOException { + // Restore the connection's temp destinations. + for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) { + transport.apply((DestinationInfo)iter2.next()); + } + } + + public Response processAddDestination(DestinationInfo info) { + if (info != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(info.getConnectionId()); + if (cs != null && info.getDestination().isTemporary()) { + cs.addTempDestination(info); + } + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveDestination(DestinationInfo info) { + if (info != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(info.getConnectionId()); + if (cs != null && info.getDestination().isTemporary()) { + cs.removeTempDestination(info.getDestination()); + } + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processAddProducer(ProducerInfo info) { + if (info != null && info.getProducerId() != null) { + SessionId sessionId = info.getProducerId().getParentId(); + if (sessionId != null) { + ConnectionId connectionId = sessionId.getParentId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + SessionState ss = cs.getSessionState(sessionId); + if (ss != null) { + ss.addProducer(info); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveProducer(ProducerId id) { + if (id != null) { + SessionId sessionId = id.getParentId(); + if (sessionId != null) { + ConnectionId connectionId = sessionId.getParentId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + SessionState ss = cs.getSessionState(sessionId); + if (ss != null) { + ss.removeProducer(id); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processAddConsumer(ConsumerInfo info) { + if (info != null) { + SessionId sessionId = info.getConsumerId().getParentId(); + if (sessionId != null) { + ConnectionId connectionId = sessionId.getParentId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + SessionState ss = cs.getSessionState(sessionId); + if (ss != null) { + ss.addConsumer(info); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) { + if (id != null) { + SessionId sessionId = id.getParentId(); + if (sessionId != null) { + ConnectionId connectionId = sessionId.getParentId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + SessionState ss = cs.getSessionState(sessionId); + if (ss != null) { + ss.removeConsumer(id); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processAddSession(SessionInfo info) { + if (info != null) { + ConnectionId connectionId = info.getSessionId().getParentId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + cs.addSession(info); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) { + if (id != null) { + ConnectionId connectionId = id.getParentId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + cs.removeSession(id); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processAddConnection(ConnectionInfo info) { + if (info != null) { + connectionStates.put(info.getConnectionId(), new org.apache.activemq.apollo.openwire.support.state.ConnectionState(info)); + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { + if (id != null) { + connectionStates.remove(id); + } + return TRACKED_RESPONSE_MARKER; + } + + public Response processMessage(Message send) throws Exception { + if (send != null) { + if (trackTransactions && send.getTransactionId() != null) { + ConnectionId connectionId = send.getProducerId().getParentId().getParentId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); + if (transactionState != null) { + transactionState.addCommand(send); + } + } + } + return TRACKED_RESPONSE_MARKER; + }else if (trackMessages) { + messageCache.put(send.getMessageId(), send.copy()); + } + } + return null; + } + + public Response processBeginTransaction(TransactionInfo info) { + if (trackTransactions && info != null && info.getTransactionId() != null) { + ConnectionId connectionId = info.getConnectionId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + cs.addTransactionState(info.getTransactionId()); + TransactionState state = cs.getTransactionState(info.getTransactionId()); + state.addCommand(info); + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public Response processPrepareTransaction(TransactionInfo info) throws Exception { + if (trackTransactions && info != null) { + ConnectionId connectionId = info.getConnectionId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if (transactionState != null) { + transactionState.addCommand(info); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + if (trackTransactions && info != null) { + ConnectionId connectionId = info.getConnectionId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if (transactionState != null) { + transactionState.addCommand(info); + return new org.apache.activemq.apollo.openwire.support.state.Tracked(new RemoveTransactionAction(info)); + } + } + } + } + return null; + } + + public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + if (trackTransactions && info != null) { + ConnectionId connectionId = info.getConnectionId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if (transactionState != null) { + transactionState.addCommand(info); + return new org.apache.activemq.apollo.openwire.support.state.Tracked(new RemoveTransactionAction(info)); + } + } + } + } + return null; + } + + public Response processRollbackTransaction(TransactionInfo info) throws Exception { + if (trackTransactions && info != null) { + ConnectionId connectionId = info.getConnectionId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if (transactionState != null) { + transactionState.addCommand(info); + return new org.apache.activemq.apollo.openwire.support.state.Tracked(new RemoveTransactionAction(info)); + } + } + } + } + return null; + } + + public Response processEndTransaction(TransactionInfo info) throws Exception { + if (trackTransactions && info != null) { + ConnectionId connectionId = info.getConnectionId(); + if (connectionId != null) { + org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if (transactionState != null) { + transactionState.addCommand(info); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public boolean isRestoreConsumers() { + return restoreConsumers; + } + + public void setRestoreConsumers(boolean restoreConsumers) { + this.restoreConsumers = restoreConsumers; + } + + public boolean isRestoreProducers() { + return restoreProducers; + } + + public void setRestoreProducers(boolean restoreProducers) { + this.restoreProducers = restoreProducers; + } + + public boolean isRestoreSessions() { + return restoreSessions; + } + + public void setRestoreSessions(boolean restoreSessions) { + this.restoreSessions = restoreSessions; + } + + public boolean isTrackTransactions() { + return trackTransactions; + } + + public void setTrackTransactions(boolean trackTransactions) { + this.trackTransactions = trackTransactions; + } + + public boolean isRestoreTransaction() { + return restoreTransaction; + } + + public void setRestoreTransaction(boolean restoreTransaction) { + this.restoreTransaction = restoreTransaction; + } + + public boolean isTrackMessages() { + return trackMessages; + } + + public void setTrackMessages(boolean trackMessages) { + this.trackMessages = trackMessages; + } + + public int getMaxCacheSize() { + return maxCacheSize; + } + + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + +} Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.openwire.support.state; + +import org.apache.activemq.apollo.openwire.command.ConsumerInfo; + +public class ConsumerState { + final ConsumerInfo info; + + public ConsumerState(ConsumerInfo info) { + this.info = info; + } + public String toString() { + return info.toString(); + } + public ConsumerInfo getInfo() { + return info; + } +} Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.openwire.support.state; + +import org.apache.activemq.apollo.openwire.command.ProducerInfo; + +public class ProducerState { + final ProducerInfo info; + + public ProducerState(ProducerInfo info) { + this.info = info; + } + + public String toString() { + return info.toString(); + } + + public ProducerInfo getInfo() { + return info; + } + +} Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.openwire.support.state; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.apollo.openwire.command.ConsumerId; +import org.apache.activemq.apollo.openwire.command.ConsumerInfo; +import org.apache.activemq.apollo.openwire.command.ProducerId; +import org.apache.activemq.apollo.openwire.command.ProducerInfo; +import org.apache.activemq.apollo.openwire.command.SessionInfo; + +public class SessionState { + final SessionInfo info; + + private final Map producers = new ConcurrentHashMap(); + private final Map consumers = new ConcurrentHashMap(); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + + public SessionState(SessionInfo info) { + this.info = info; + } + + public String toString() { + return info.toString(); + } + + public void addProducer(ProducerInfo info) { + checkShutdown(); + producers.put(info.getProducerId(), new org.apache.activemq.apollo.openwire.support.state.ProducerState(info)); + } + + public org.apache.activemq.apollo.openwire.support.state.ProducerState removeProducer(ProducerId id) { + return producers.remove(id); + } + + public void addConsumer(ConsumerInfo info) { + checkShutdown(); + consumers.put(info.getConsumerId(), new org.apache.activemq.apollo.openwire.support.state.ConsumerState(info)); + } + + public org.apache.activemq.apollo.openwire.support.state.ConsumerState removeConsumer(ConsumerId id) { + return consumers.remove(id); + } + + public SessionInfo getInfo() { + return info; + } + + public Set getConsumerIds() { + return consumers.keySet(); + } + + public Set getProducerIds() { + return producers.keySet(); + } + + public Collection getProducerStates() { + return producers.values(); + } + + public org.apache.activemq.apollo.openwire.support.state.ProducerState getProducerState(ProducerId producerId) { + return producers.get(producerId); + } + + public Collection getConsumerStates() { + return consumers.values(); + } + + public org.apache.activemq.apollo.openwire.support.state.ConsumerState getConsumerState(ConsumerId consumerId) { + return consumers.get(consumerId); + } + + private void checkShutdown() { + if (shutdown.get()) { + throw new IllegalStateException("Disposed"); + } + } + + public void shutdown() { + shutdown.set(false); + } + +} Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/Tracked.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/Tracked.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/Tracked.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/Tracked.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.state; + +import org.apache.activemq.apollo.openwire.command.Response; + +public class Tracked extends Response { + + private Runnable runnable; + + public Tracked(Runnable runnable) { + this.runnable = runnable; + } + + public void onResponses() { + if (runnable != null) { + runnable.run(); + runnable = null; + } + } + + public boolean isWaitingForResponse() { + return runnable != null; + } + +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/TransactionState.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/TransactionState.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/TransactionState.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/TransactionState.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.state; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.apollo.openwire.command.Command; +import org.apache.activemq.apollo.openwire.command.TransactionId; + +public class TransactionState { + + private final List commands = new ArrayList(); + private final TransactionId id; + private final AtomicBoolean shutdown = new AtomicBoolean(false); + private boolean prepared; + private int preparedResult; + + public TransactionState(TransactionId id) { + this.id = id; + } + + public String toString() { + return id.toString(); + } + + public void addCommand(Command operation) { + checkShutdown(); + commands.add(operation); + } + + public List getCommands() { + return commands; + } + + private void checkShutdown() { + if (shutdown.get()) { + throw new IllegalStateException("Disposed"); + } + } + + public void shutdown() { + shutdown.set(false); + } + + public TransactionId getId() { + return id; + } + + public void setPrepared(boolean prepared) { + this.prepared = prepared; + } + + public boolean isPrepared() { + return prepared; + } + + public void setPreparedResult(int preparedResult) { + this.preparedResult = preparedResult; + } + + public int getPreparedResult() { + return preparedResult; + } + +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties Wed Apr 27 17:32:51 2011 @@ -0,0 +1,35 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# The logging properties used during tests.. +# +log4j.rootLogger=WARN, console, file +log4j.logger.org.apache.activemq=TRACE + +# Console will only display warnnings +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n +log4j.appender.console.threshold=TRACE + +# File appender will contain all info messages +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n +log4j.appender.file.file=target/test.log +log4j.appender.file.append=true Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml Wed Apr 27 17:32:51 2011 @@ -0,0 +1,28 @@ + + + + + This broker configuration is what the unit tests in this module load up. + + + localhost + + + + + \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties Wed Apr 27 17:32:51 2011 @@ -0,0 +1,35 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# The logging properties used during tests.. +# +log4j.rootLogger=WARN, console, file +log4j.logger.org.apache.activemq=TRACE + +# Console will only display warnnings +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n +log4j.appender.console.threshold=TRACE + +# File appender will contain all info messages +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n +log4j.appender.file.file=target/test.log +log4j.appender.file.append=true Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala Wed Apr 27 17:32:51 2011 @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.openwire + +/** + * Copyright (C) 2010, FuseSource Corp. All rights reserved. + */ +import java.lang.String +import org.apache.activemq.apollo.broker.{Broker, BrokerFactory} +import org.apache.activemq.apollo.util.{Logging, FunSuiteSupport, ServiceControl} +import javax.jms._ +import org.apache.activemq.ActiveMQConnectionFactory +import org.scalatest.BeforeAndAfterEach +import org.scalatest.matchers.ShouldMatchers +import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue} + +/** + *

+ *

+ * + * @author Hiram Chirino + */ +class OpenwireTest extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach with Logging { + + var broker: Broker = null + var port = 0 + + val broker_config_uri = "xml:classpath:apollo-openwire.xml" + + override protected def beforeAll() = { + info("Loading broker configuration from the classpath with URI: " + broker_config_uri) + broker = BrokerFactory.createBroker(broker_config_uri) + ServiceControl.start(broker, "Starting broker") + port = broker.get_socket_address.getPort + } + + var default_connection:Connection = _ + var connections = List[Connection]() + + override protected def afterAll() = { + broker.stop + } + + override protected def afterEach() = { + super.afterEach + connections.foreach(_.close) + connections = Nil + default_connection = null + } + + def create_connection_factory = new ActiveMQConnectionFactory("tcp://localhost:%d?wireFormat.maxInactivityDuration=1000000&wireFormat.maxInactivityDurationInitalDelay=1000000".format(port)) + def create_connection: Connection = create_connection_factory.createConnection + def queue(value:String) = new ActiveMQQueue(value); + def topic(value:String) = new ActiveMQTopic(value); + + def connect(start:Boolean=true) = { + val connection = create_connection + connections ::= connection + if(default_connection==null) { + default_connection = connection + } + if( start ) { + connection.start + } + connection + } + + test("Queue order preserved") {x} + def x = { + println("connecting") + connect() + println("connectted") + val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val producer = session.createProducer(queue("example")) + def put(id:Int) = { + producer.send(session.createTextMessage("message:"+id)) + } + + List(1,2,3).foreach(put _) + + val consumer = session.createConsumer(queue("example")) + + def get(id:Int) = { + val m = consumer.receive().asInstanceOf[TextMessage] + m.getJMSDestination should equal(queue("example")) + m.getText should equal ("message:"+id) + } + + List(1,2,3).foreach(get _) + } + + test("Topic drops messages sent before before subscription is established") { + connect() + val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val producer = session.createProducer(topic("example")) + def put(id:Int) = { + producer.send(session.createTextMessage("message:"+id)) + } + + put(1) + + val consumer = session.createConsumer(topic("example")) + + put(2) + put(3) + + def get(id:Int) = { + val m = consumer.receive().asInstanceOf[TextMessage] + m.getJMSDestination should equal(topic("example")) + m.getText should equal ("message:"+id) + } + + List(2,3).foreach(get _) + } + + + test("Topic /w Durable sub retains messages.") { + + def create_durable_sub() = { + val connection = connect(false) + connection.setClientID("test") + connection.start + val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val subscriber = session.createDurableSubscriber(topic("example"), "test") + session.close + connection.close + if (default_connection == connection) { + default_connection = null + } + } + + create_durable_sub + + connect(false) + default_connection.setClientID("test") + default_connection.start + val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val producer = session.createProducer(topic("example")) + def put(id:Int) = { + producer.send(session.createTextMessage("message:"+id)) + } + + List(1,2,3).foreach(put _) + + val subscriber = session.createDurableSubscriber(topic("example"), "test") + + def get(id:Int) = { + val m = subscriber.receive().asInstanceOf[TextMessage] + m.getJMSDestination should equal(topic("example")) + m.getText should equal ("message:"+id) + } + + List(1,2,3).foreach(get _) + } + + test("Queue and a selector") { + connect() + val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val producer = session.createProducer(queue("example")) + + def put(id:Int, color:String) = { + val message = session.createTextMessage("message:" + id) + message.setStringProperty("color", color) + producer.send(message) + } + + List((1, "red"), (2, "blue"), (3, "red")).foreach { + case (id, color) => put(id, color) + } + + val consumer = session.createConsumer(queue("example"), "color='red'") + + def get(id:Int) = { + val m = consumer.receive().asInstanceOf[TextMessage] + m.getJMSDestination should equal(queue("example")) + m.getText should equal ("message:"+id) + } + + get(1) + get(3) + } +} \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/codec/BooleanStreamTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/codec/BooleanStreamTest.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/codec/BooleanStreamTest.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/codec/BooleanStreamTest.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.codec; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import junit.framework.AssertionFailedError; +import junit.framework.TestCase; + +/** + */ +public class BooleanStreamTest extends TestCase { + + protected OpenWireFormat openWireformat; + protected int endOfStreamMarker = 0x12345678; + int numberOfBytes = 8 * 200; + + interface BooleanValueSet { + boolean getBooleanValueFor(int index, int count); + } + + public void testBooleanMarshallingUsingAllTrue() throws Exception { + testBooleanStream(numberOfBytes, new BooleanValueSet() { + public boolean getBooleanValueFor(int index, int count) { + return true; + } + }); + } + + public void testBooleanMarshallingUsingAllFalse() throws Exception { + testBooleanStream(numberOfBytes, new BooleanValueSet() { + public boolean getBooleanValueFor(int index, int count) { + return false; + } + }); + } + + public void testBooleanMarshallingUsingOddAlternateTrueFalse() throws Exception { + testBooleanStream(numberOfBytes, new BooleanValueSet() { + public boolean getBooleanValueFor(int index, int count) { + return (index & 1) == 0; + } + }); + } + + public void testBooleanMarshallingUsingEvenAlternateTrueFalse() throws Exception { + testBooleanStream(numberOfBytes, new BooleanValueSet() { + public boolean getBooleanValueFor(int index, int count) { + return (index & 1) != 0; + } + }); + } + + protected void testBooleanStream(int numberOfBytes, BooleanValueSet valueSet) throws Exception { + for (int i = 0; i < numberOfBytes; i++) { + try { + assertMarshalBooleans(i, valueSet); + } catch (Throwable e) { + throw (AssertionFailedError)new AssertionFailedError("Iteration failed at: " + i).initCause(e); + } + } + } + + protected void assertMarshalBooleans(int count, BooleanValueSet valueSet) throws Exception { + BooleanStream bs = new BooleanStream(); + for (int i = 0; i < count; i++) { + bs.writeBoolean(valueSet.getBooleanValueFor(i, count)); + } + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + DataOutputStream ds = new DataOutputStream(buffer); + bs.marshal(ds); + ds.writeInt(endOfStreamMarker); + + // now lets read from the stream + ds.close(); + + ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray()); + DataInputStream dis = new DataInputStream(in); + bs = new BooleanStream(); + try { + bs.unmarshal(dis); + } catch (Exception e) { + e.printStackTrace(); + fail("Failed to unmarshal: " + count + " booleans: " + e); + } + + for (int i = 0; i < count; i++) { + boolean expected = valueSet.getBooleanValueFor(i, count); + // /System.out.println("Unmarshaling value: " + i + " = " + expected + // + " out of: " + count); + + try { + boolean actual = bs.readBoolean(); + assertEquals("value of object: " + i + " was: " + actual, expected, actual); + } catch (IOException e) { + e.printStackTrace(); + fail("Failed to parse boolean: " + i + " out of: " + count + " due to: " + e); + } + } + int marker = dis.readInt(); + assertEquals("Marker int when unmarshalling: " + count + " booleans", Integer.toHexString(endOfStreamMarker), Integer.toHexString(marker)); + + // lets try read and we should get an exception + try { + byte value = dis.readByte(); + fail("Should have reached the end of the stream"); + } catch (IOException e) { + // worked! + } + } + + protected void setUp() throws Exception { + super.setUp(); + openWireformat = createOpenWireFormat(); + } + + protected OpenWireFormat createOpenWireFormat() { + OpenWireFormat wf = new OpenWireFormat(); + wf.setCacheEnabled(true); + wf.setStackTraceEnabled(false); + wf.setVersion(1); + return wf; + } + +} Modified: activemq/activemq-apollo/trunk/pom.xml URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1097189&r1=1097188&r2=1097189&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/pom.xml (original) +++ activemq/activemq-apollo/trunk/pom.xml Wed Apr 27 17:32:51 2011 @@ -498,6 +498,7 @@ apollo-cassandra apollo-hawtdb + apollo-openwire