activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1097189 [42/42] - in /activemq/activemq-apollo/trunk: ./ apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/ apollo-openwire/src/main/resources/ apollo-openwire/src/main/resources/META-INF/ apollo-openwire/src/main/resources/ME...
Date Wed, 27 Apr 2011 17:33:09 GMT
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitorAdapter.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitorAdapter.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitorAdapter.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitorAdapter.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.state;
+
+import org.apache.activemq.apollo.openwire.command.BrokerInfo;
+import org.apache.activemq.apollo.openwire.command.ConnectionControl;
+import org.apache.activemq.apollo.openwire.command.ConnectionError;
+import org.apache.activemq.apollo.openwire.command.ConnectionId;
+import org.apache.activemq.apollo.openwire.command.ConnectionInfo;
+import org.apache.activemq.apollo.openwire.command.ConsumerControl;
+import org.apache.activemq.apollo.openwire.command.ConsumerId;
+import org.apache.activemq.apollo.openwire.command.ConsumerInfo;
+import org.apache.activemq.apollo.openwire.command.ControlCommand;
+import org.apache.activemq.apollo.openwire.command.DestinationInfo;
+import org.apache.activemq.apollo.openwire.command.FlushCommand;
+import org.apache.activemq.apollo.openwire.command.KeepAliveInfo;
+import org.apache.activemq.apollo.openwire.command.Message;
+import org.apache.activemq.apollo.openwire.command.MessageAck;
+import org.apache.activemq.apollo.openwire.command.MessageDispatch;
+import org.apache.activemq.apollo.openwire.command.MessageDispatchNotification;
+import org.apache.activemq.apollo.openwire.command.MessagePull;
+import org.apache.activemq.apollo.openwire.command.ProducerAck;
+import org.apache.activemq.apollo.openwire.command.ProducerId;
+import org.apache.activemq.apollo.openwire.command.ProducerInfo;
+import org.apache.activemq.apollo.openwire.command.RemoveInfo;
+import org.apache.activemq.apollo.openwire.command.RemoveSubscriptionInfo;
+import org.apache.activemq.apollo.openwire.command.Response;
+import org.apache.activemq.apollo.openwire.command.SessionId;
+import org.apache.activemq.apollo.openwire.command.SessionInfo;
+import org.apache.activemq.apollo.openwire.command.ShutdownInfo;
+import org.apache.activemq.apollo.openwire.command.TransactionInfo;
+import org.apache.activemq.apollo.openwire.command.WireFormatInfo;
+
+public class CommandVisitorAdapter implements CommandVisitor {
+
+    public Response processAddConnection(ConnectionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processAddConsumer(ConsumerInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processAddDestination(DestinationInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processAddProducer(ProducerInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processAddSession(SessionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processBeginTransaction(TransactionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processBrokerInfo(BrokerInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processEndTransaction(TransactionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processFlush(FlushCommand command) throws Exception {
+        return null;
+    }
+
+    public Response processForgetTransaction(TransactionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processMessage(Message send) throws Exception {
+        return null;
+    }
+
+    public Response processMessageAck(MessageAck ack) throws Exception {
+        return null;
+    }
+
+    public Response processMessageDispatchNotification(MessageDispatchNotification notification)
+        throws Exception {
+        return null;
+    }
+
+    public Response processMessagePull(MessagePull pull) throws Exception {
+        return null;
+    }
+
+    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processProducerAck(ProducerAck ack) throws Exception {
+        return null;
+    }
+
+    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processRemoveConnection(RemoveInfo info, ConnectionId id, long lastDeliveredSequenceId) throws Exception {
+        return null;
+    }
+
+    public Response processRemoveConsumer(RemoveInfo info, ConsumerId id, long lastDeliveredSequenceId) throws Exception {
+        return null;
+    }
+
+    public Response processRemoveDestination(DestinationInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processRemoveProducer(RemoveInfo info, ProducerId id) throws Exception {
+        return null;
+    }
+
+    public Response processRemoveSession(RemoveInfo info, SessionId id, long lastDeliveredSequenceId) throws Exception {
+        return null;
+    }
+
+    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processShutdown(ShutdownInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processWireFormat(WireFormatInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
+        return null;
+    }
+
+    public Response processControlCommand(ControlCommand command) throws Exception {
+        return null;
+    }
+
+    public Response processConnectionControl(ConnectionControl control) throws Exception {
+        return null;
+    }
+
+    public Response processConnectionError(ConnectionError error) throws Exception {
+        return null;
+    }
+
+    public Response processConsumerControl(ConsumerControl control) throws Exception {
+        return null;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support.state;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.apollo.openwire.command.ActiveMQDestination;
+import org.apache.activemq.apollo.openwire.command.ConnectionInfo;
+import org.apache.activemq.apollo.openwire.command.DestinationInfo;
+import org.apache.activemq.apollo.openwire.command.SessionId;
+import org.apache.activemq.apollo.openwire.command.SessionInfo;
+import org.apache.activemq.apollo.openwire.command.TransactionId;
+
+public class ConnectionState {
+
+    ConnectionInfo info;
+    private final ConcurrentHashMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>();
+    private final ConcurrentHashMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>();
+    private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>());
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+    public ConnectionState(ConnectionInfo info) {
+        this.info = info;
+        // Add the default session id.
+        addSession(new SessionInfo(info, -1));
+    }
+
+    public String toString() {
+        return info.toString();
+    }
+
+    public void reset(ConnectionInfo info) {
+        this.info = info;
+        transactions.clear();
+        sessions.clear();
+        tempDestinations.clear();
+        shutdown.set(false);
+        // Add the default session id.
+        addSession(new SessionInfo(info, -1));
+    }
+
+    public void addTempDestination(DestinationInfo info) {
+        checkShutdown();
+        tempDestinations.add(info);
+    }
+
+    public void removeTempDestination(ActiveMQDestination destination) {
+        for (Iterator<DestinationInfo> iter = tempDestinations.iterator(); iter.hasNext();) {
+            DestinationInfo di = iter.next();
+            if (di.getDestination().equals(destination)) {
+                iter.remove();
+            }
+        }
+    }
+
+    public void addTransactionState(TransactionId id) {
+        checkShutdown();
+        transactions.put(id, new TransactionState(id));
+    }
+
+    public TransactionState getTransactionState(TransactionId id) {
+        return transactions.get(id);
+    }
+
+    public Collection<TransactionState> getTransactionStates() {
+        return transactions.values();
+    }
+
+    public TransactionState removeTransactionState(TransactionId id) {
+        return transactions.remove(id);
+    }
+
+    public void addSession(SessionInfo info) {
+        checkShutdown();
+        sessions.put(info.getSessionId(), new SessionState(info));
+    }
+
+    public SessionState removeSession(SessionId id) {
+        return sessions.remove(id);
+    }
+
+    public SessionState getSessionState(SessionId id) {
+        return sessions.get(id);
+    }
+
+    public ConnectionInfo getInfo() {
+        return info;
+    }
+
+    public Set<SessionId> getSessionIds() {
+        return sessions.keySet();
+    }
+
+    public List<DestinationInfo> getTempDesinations() {
+        return tempDestinations;
+    }
+
+    public Collection<SessionState> getSessionStates() {
+        return sessions.values();
+    }
+
+    private void checkShutdown() {
+        if (shutdown.get()) {
+            throw new IllegalStateException("Disposed");
+        }
+    }
+
+    public void shutdown() {
+        if (shutdown.compareAndSet(false, true)) {
+            for (Iterator<SessionState> iter = sessions.values().iterator(); iter.hasNext();) {
+                SessionState ss = iter.next();
+                ss.shutdown();
+            }
+        }
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.state;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.apollo.util.IOExceptionSupport;
+import org.apache.activemq.apollo.util.VoidFunction1;
+import org.apache.activemq.apollo.openwire.command.Command;
+import org.apache.activemq.apollo.openwire.command.ConnectionId;
+import org.apache.activemq.apollo.openwire.command.ConnectionInfo;
+import org.apache.activemq.apollo.openwire.command.ConsumerId;
+import org.apache.activemq.apollo.openwire.command.ConsumerInfo;
+import org.apache.activemq.apollo.openwire.command.DestinationInfo;
+import org.apache.activemq.apollo.openwire.command.Message;
+import org.apache.activemq.apollo.openwire.command.MessageId;
+import org.apache.activemq.apollo.openwire.command.ProducerId;
+import org.apache.activemq.apollo.openwire.command.ProducerInfo;
+import org.apache.activemq.apollo.openwire.command.Response;
+import org.apache.activemq.apollo.openwire.command.SessionId;
+import org.apache.activemq.apollo.openwire.command.SessionInfo;
+import org.apache.activemq.apollo.openwire.command.TransactionInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Tracks the state of a connection so a newly established transport can be
+ * re-initialized to the state that was tracked.
+ * 
+ */
+public class ConnectionStateTracker extends CommandVisitorAdapter {
+    private static final Log LOG = LogFactory.getLog(ConnectionStateTracker.class);
+
+    private static final org.apache.activemq.apollo.openwire.support.state.Tracked TRACKED_RESPONSE_MARKER = new org.apache.activemq.apollo.openwire.support.state.Tracked(null);
+
+    protected final ConcurrentHashMap<ConnectionId, org.apache.activemq.apollo.openwire.support.state.ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, org.apache.activemq.apollo.openwire.support.state.ConnectionState>();
+     
+    private boolean trackTransactions;
+    private boolean restoreSessions = true;
+    private boolean restoreConsumers = true;
+    private boolean restoreProducers = true;
+    private boolean restoreTransaction = true;
+    private boolean trackMessages = true;
+    private int maxCacheSize = 128 * 1024;
+    private int currentCacheSize;
+    private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
+        protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) {
+            boolean result = currentCacheSize > maxCacheSize;
+            if (result) {
+                currentCacheSize -= eldest.getValue().getSize();
+            }
+            return result;
+        }
+    };
+    
+    
+    private class RemoveTransactionAction implements Runnable {
+        private final TransactionInfo info;
+
+        public RemoveTransactionAction(TransactionInfo info) {
+            this.info = info;
+        }
+
+        public void run() {
+            ConnectionId connectionId = info.getConnectionId();
+            org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+            cs.removeTransactionState(info.getTransactionId());
+        }
+    }
+
+    /**
+     * 
+     * 
+     * @param command
+     * @return null if the command is not state tracked.
+     * @throws IOException
+     */
+    public org.apache.activemq.apollo.openwire.support.state.Tracked track(Command command) throws IOException {
+        try {
+            return (org.apache.activemq.apollo.openwire.support.state.Tracked)command.visit(this);
+        } catch (IOException e) {
+            throw e;
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+    
+    public void trackBack(Command command) {
+        if (trackMessages && command != null && command.isMessage()) {
+            Message message = (Message) command;
+            if (message.getTransactionId()==null) {
+                currentCacheSize = currentCacheSize +  message.getSize();
+            }
+        }
+    }
+
+    public void restore(VoidFunction1<Command> transport) throws IOException {
+        // Restore the connections.
+        for (Iterator<org.apache.activemq.apollo.openwire.support.state.ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
+            org.apache.activemq.apollo.openwire.support.state.ConnectionState connectionState = iter.next();
+            transport.apply(connectionState.getInfo());
+            restoreTempDestinations(transport, connectionState);
+
+            if (restoreSessions) {
+                restoreSessions(transport, connectionState);
+            }
+
+            if (restoreTransaction) {
+                restoreTransactions(transport, connectionState);
+            }
+        }
+        //now flush messages
+        for (Message msg:messageCache.values()) {
+            transport.apply(msg);
+        }
+    }
+
+    private void restoreTransactions(VoidFunction1<Command> transport, org.apache.activemq.apollo.openwire.support.state.ConnectionState connectionState) throws IOException {
+        for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
+            TransactionState transactionState = (TransactionState)iter.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("tx: " + transactionState.getId());
+            }
+            for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
+                Command command = (Command)iterator.next();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("tx replay: " + command);
+                }
+                transport.apply(command);
+            }
+        }
+    }
+
+    /**
+     * @param transport
+     * @param connectionState
+     * @throws IOException
+     */
+    protected void restoreSessions(VoidFunction1<Command> transport, org.apache.activemq.apollo.openwire.support.state.ConnectionState connectionState) throws IOException {
+        // Restore the connection's sessions
+        for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
+            SessionState sessionState = (SessionState)iter2.next();
+            transport.apply(sessionState.getInfo());
+
+            if (restoreProducers) {
+                restoreProducers(transport, sessionState);
+            }
+
+            if (restoreConsumers) {
+                restoreConsumers(transport, sessionState);
+            }
+        }
+    }
+
+    /**
+     * @param transport
+     * @param sessionState
+     * @throws IOException
+     */
+    protected void restoreConsumers(VoidFunction1<Command> transport, SessionState sessionState) throws IOException {
+        // Restore the session's consumers
+        for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) {
+            org.apache.activemq.apollo.openwire.support.state.ConsumerState consumerState = (org.apache.activemq.apollo.openwire.support.state.ConsumerState)iter3.next();
+            transport.apply(consumerState.getInfo());
+        }
+    }
+
+    /**
+     * @param transport
+     * @param sessionState
+     * @throws IOException
+     */
+    protected void restoreProducers(VoidFunction1<Command> transport, SessionState sessionState) throws IOException {
+        // Restore the session's producers
+        for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
+            org.apache.activemq.apollo.openwire.support.state.ProducerState producerState = (org.apache.activemq.apollo.openwire.support.state.ProducerState)iter3.next();
+            transport.apply(producerState.getInfo());
+        }
+    }
+
+    /**
+     * @param transport
+     * @param connectionState
+     * @throws IOException
+     */
+    protected void restoreTempDestinations(VoidFunction1<Command> transport, org.apache.activemq.apollo.openwire.support.state.ConnectionState connectionState)
+        throws IOException {
+        // Restore the connection's temp destinations.
+        for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) {
+            transport.apply((DestinationInfo)iter2.next());
+        }
+    }
+
+    public Response processAddDestination(DestinationInfo info) {
+        if (info != null) {
+            org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(info.getConnectionId());
+            if (cs != null && info.getDestination().isTemporary()) {
+                cs.addTempDestination(info);
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveDestination(DestinationInfo info) {
+        if (info != null) {
+            org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(info.getConnectionId());
+            if (cs != null && info.getDestination().isTemporary()) {
+                cs.removeTempDestination(info.getDestination());
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processAddProducer(ProducerInfo info) {
+        if (info != null && info.getProducerId() != null) {
+            SessionId sessionId = info.getProducerId().getParentId();
+            if (sessionId != null) {
+                ConnectionId connectionId = sessionId.getParentId();
+                if (connectionId != null) {
+                    org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        SessionState ss = cs.getSessionState(sessionId);
+                        if (ss != null) {
+                            ss.addProducer(info);
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveProducer(ProducerId id) {
+        if (id != null) {
+            SessionId sessionId = id.getParentId();
+            if (sessionId != null) {
+                ConnectionId connectionId = sessionId.getParentId();
+                if (connectionId != null) {
+                    org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        SessionState ss = cs.getSessionState(sessionId);
+                        if (ss != null) {
+                            ss.removeProducer(id);
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processAddConsumer(ConsumerInfo info) {
+        if (info != null) {
+            SessionId sessionId = info.getConsumerId().getParentId();
+            if (sessionId != null) {
+                ConnectionId connectionId = sessionId.getParentId();
+                if (connectionId != null) {
+                    org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        SessionState ss = cs.getSessionState(sessionId);
+                        if (ss != null) {
+                            ss.addConsumer(info);
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
+        if (id != null) {
+            SessionId sessionId = id.getParentId();
+            if (sessionId != null) {
+                ConnectionId connectionId = sessionId.getParentId();
+                if (connectionId != null) {
+                    org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        SessionState ss = cs.getSessionState(sessionId);
+                        if (ss != null) {
+                            ss.removeConsumer(id);
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processAddSession(SessionInfo info) {
+        if (info != null) {
+            ConnectionId connectionId = info.getSessionId().getParentId();
+            if (connectionId != null) {
+                org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    cs.addSession(info);
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
+        if (id != null) {
+            ConnectionId connectionId = id.getParentId();
+            if (connectionId != null) {
+                org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    cs.removeSession(id);
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processAddConnection(ConnectionInfo info) {
+        if (info != null) {
+            connectionStates.put(info.getConnectionId(), new org.apache.activemq.apollo.openwire.support.state.ConnectionState(info));
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
+        if (id != null) {
+            connectionStates.remove(id);
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+
+    public Response processMessage(Message send) throws Exception {
+        if (send != null) {
+            if (trackTransactions && send.getTransactionId() != null) {
+                ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+                if (connectionId != null) {
+                    org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
+                        if (transactionState != null) {
+                            transactionState.addCommand(send);
+                        }
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }else if (trackMessages) {
+                messageCache.put(send.getMessageId(), send.copy());
+            }
+        }
+        return null;
+    }
+
+    public Response processBeginTransaction(TransactionInfo info) {
+        if (trackTransactions && info != null && info.getTransactionId() != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    cs.addTransactionState(info.getTransactionId());
+                    TransactionState state = cs.getTransactionState(info.getTransactionId());
+                    state.addCommand(info);
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+        return null;
+    }
+
+    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+        return null;
+    }
+
+    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                        return new org.apache.activemq.apollo.openwire.support.state.Tracked(new RemoveTransactionAction(info));
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                        return new org.apache.activemq.apollo.openwire.support.state.Tracked(new RemoveTransactionAction(info));
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                        return new org.apache.activemq.apollo.openwire.support.state.Tracked(new RemoveTransactionAction(info));
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public Response processEndTransaction(TransactionInfo info) throws Exception {
+        if (trackTransactions && info != null) {
+            ConnectionId connectionId = info.getConnectionId();
+            if (connectionId != null) {
+                org.apache.activemq.apollo.openwire.support.state.ConnectionState cs = connectionStates.get(connectionId);
+                if (cs != null) {
+                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+                    if (transactionState != null) {
+                        transactionState.addCommand(info);
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+        return null;
+    }
+
+    public boolean isRestoreConsumers() {
+        return restoreConsumers;
+    }
+
+    public void setRestoreConsumers(boolean restoreConsumers) {
+        this.restoreConsumers = restoreConsumers;
+    }
+
+    public boolean isRestoreProducers() {
+        return restoreProducers;
+    }
+
+    public void setRestoreProducers(boolean restoreProducers) {
+        this.restoreProducers = restoreProducers;
+    }
+
+    public boolean isRestoreSessions() {
+        return restoreSessions;
+    }
+
+    public void setRestoreSessions(boolean restoreSessions) {
+        this.restoreSessions = restoreSessions;
+    }
+
+    public boolean isTrackTransactions() {
+        return trackTransactions;
+    }
+
+    public void setTrackTransactions(boolean trackTransactions) {
+        this.trackTransactions = trackTransactions;
+    }
+
+    public boolean isRestoreTransaction() {
+        return restoreTransaction;
+    }
+
+    public void setRestoreTransaction(boolean restoreTransaction) {
+        this.restoreTransaction = restoreTransaction;
+    }
+
+    public boolean isTrackMessages() {
+        return trackMessages;
+    }
+
+    public void setTrackMessages(boolean trackMessages) {
+        this.trackMessages = trackMessages;
+    }
+
+    public int getMaxCacheSize() {
+        return maxCacheSize;
+    }
+
+    public void setMaxCacheSize(int maxCacheSize) {
+        this.maxCacheSize = maxCacheSize;
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support.state;
+
+import org.apache.activemq.apollo.openwire.command.ConsumerInfo;
+
+public class ConsumerState {        
+    final ConsumerInfo info;
+    
+    public ConsumerState(ConsumerInfo info) {
+        this.info = info;
+    }        
+    public String toString() {
+        return info.toString();
+    }        
+    public ConsumerInfo getInfo() {
+        return info;
+    }        
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support.state;
+
+import org.apache.activemq.apollo.openwire.command.ProducerInfo;
+
+public class ProducerState {
+    final ProducerInfo info;
+
+    public ProducerState(ProducerInfo info) {
+        this.info = info;
+    }
+
+    public String toString() {
+        return info.toString();
+    }
+
+    public ProducerInfo getInfo() {
+        return info;
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.support.state;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.apollo.openwire.command.ConsumerId;
+import org.apache.activemq.apollo.openwire.command.ConsumerInfo;
+import org.apache.activemq.apollo.openwire.command.ProducerId;
+import org.apache.activemq.apollo.openwire.command.ProducerInfo;
+import org.apache.activemq.apollo.openwire.command.SessionInfo;
+
+public class SessionState {
+    final SessionInfo info;
+
+    private final Map<ProducerId, org.apache.activemq.apollo.openwire.support.state.ProducerState> producers = new ConcurrentHashMap<ProducerId, org.apache.activemq.apollo.openwire.support.state.ProducerState>();
+    private final Map<ConsumerId, org.apache.activemq.apollo.openwire.support.state.ConsumerState> consumers = new ConcurrentHashMap<ConsumerId, org.apache.activemq.apollo.openwire.support.state.ConsumerState>();
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+    public SessionState(SessionInfo info) {
+        this.info = info;
+    }
+
+    public String toString() {
+        return info.toString();
+    }
+
+    public void addProducer(ProducerInfo info) {
+        checkShutdown();
+        producers.put(info.getProducerId(), new org.apache.activemq.apollo.openwire.support.state.ProducerState(info));
+    }
+
+    public org.apache.activemq.apollo.openwire.support.state.ProducerState removeProducer(ProducerId id) {
+        return producers.remove(id);
+    }
+
+    public void addConsumer(ConsumerInfo info) {
+        checkShutdown();
+        consumers.put(info.getConsumerId(), new org.apache.activemq.apollo.openwire.support.state.ConsumerState(info));
+    }
+
+    public org.apache.activemq.apollo.openwire.support.state.ConsumerState removeConsumer(ConsumerId id) {
+        return consumers.remove(id);
+    }
+
+    public SessionInfo getInfo() {
+        return info;
+    }
+
+    public Set<ConsumerId> getConsumerIds() {
+        return consumers.keySet();
+    }
+
+    public Set<ProducerId> getProducerIds() {
+        return producers.keySet();
+    }
+
+    public Collection<org.apache.activemq.apollo.openwire.support.state.ProducerState> getProducerStates() {
+        return producers.values();
+    }
+
+    public org.apache.activemq.apollo.openwire.support.state.ProducerState getProducerState(ProducerId producerId) {
+        return producers.get(producerId);
+    }
+
+    public Collection<org.apache.activemq.apollo.openwire.support.state.ConsumerState> getConsumerStates() {
+        return consumers.values();
+    }
+
+    public org.apache.activemq.apollo.openwire.support.state.ConsumerState getConsumerState(ConsumerId consumerId) {
+        return consumers.get(consumerId);
+    }
+
+    private void checkShutdown() {
+        if (shutdown.get()) {
+            throw new IllegalStateException("Disposed");
+        }
+    }
+
+    public void shutdown() {
+        shutdown.set(false);
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/Tracked.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/Tracked.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/Tracked.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/Tracked.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.state;
+
+import org.apache.activemq.apollo.openwire.command.Response;
+
+public class Tracked extends Response {
+
+    private Runnable runnable;
+
+    public Tracked(Runnable runnable) {
+        this.runnable = runnable;
+    }
+
+    public void onResponses() {
+        if (runnable != null) {
+            runnable.run();
+            runnable = null;
+        }
+    }
+
+    public boolean isWaitingForResponse() {
+        return runnable != null;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/TransactionState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/TransactionState.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/TransactionState.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/TransactionState.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.support.state;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.apollo.openwire.command.Command;
+import org.apache.activemq.apollo.openwire.command.TransactionId;
+
+public class TransactionState {
+
+    private final List<Command> commands = new ArrayList<Command>();
+    private final TransactionId id;
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+    private boolean prepared;
+    private int preparedResult;
+
+    public TransactionState(TransactionId id) {
+        this.id = id;
+    }
+
+    public String toString() {
+        return id.toString();
+    }
+
+    public void addCommand(Command operation) {
+        checkShutdown();
+        commands.add(operation);
+    }
+
+    public List<Command> getCommands() {
+        return commands;
+    }
+
+    private void checkShutdown() {
+        if (shutdown.get()) {
+            throw new IllegalStateException("Disposed");
+        }
+    }
+
+    public void shutdown() {
+        shutdown.set(false);
+    }
+
+    public TransactionId getId() {
+        return id;
+    }
+
+    public void setPrepared(boolean prepared) {
+        this.prepared = prepared;
+    }
+
+    public boolean isPrepared() {
+        return prepared;
+    }
+
+    public void setPreparedResult(int preparedResult) {
+        this.preparedResult = preparedResult;
+    }
+
+    public int getPreparedResult() {
+        return preparedResult;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties Wed Apr 27 17:32:51 2011
@@ -0,0 +1,35 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=TRACE
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/test/ide-resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml Wed Apr 27 17:32:51 2011
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+
+    <notes>This broker configuration is what the unit tests in this module load up.</notes>
+
+    <virtual_host id="default" purge_on_startup="true" auto_create_queues="true" >
+        <host_name>localhost</host_name>
+    </virtual_host>
+
+    <connector id="tcp" protocol="openwire" bind="tcp://[::]:0"/>
+
+</broker>
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties Wed Apr 27 17:32:51 2011
@@ -0,0 +1,35 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=TRACE
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala Wed Apr 27 17:32:51 2011
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+/**
+ * Copyright (C) 2010, FuseSource Corp. All rights reserved.
+ */
+import java.lang.String
+import org.apache.activemq.apollo.broker.{Broker, BrokerFactory}
+import org.apache.activemq.apollo.util.{Logging, FunSuiteSupport, ServiceControl}
+import javax.jms._
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.matchers.ShouldMatchers
+import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class OpenwireTest extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach with Logging {
+
+  var broker: Broker = null
+  var port = 0
+
+  val broker_config_uri = "xml:classpath:apollo-openwire.xml"
+
+  override protected def beforeAll() = {
+    info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
+    broker = BrokerFactory.createBroker(broker_config_uri)
+    ServiceControl.start(broker, "Starting broker")
+    port = broker.get_socket_address.getPort
+  }
+
+  var default_connection:Connection = _
+  var connections = List[Connection]()
+
+  override protected def afterAll() = {
+    broker.stop
+  }
+
+  override protected def afterEach() = {
+    super.afterEach
+    connections.foreach(_.close)
+    connections = Nil
+    default_connection = null
+  }
+
+  def create_connection_factory = new ActiveMQConnectionFactory("tcp://localhost:%d?wireFormat.maxInactivityDuration=1000000&wireFormat.maxInactivityDurationInitalDelay=1000000".format(port))
+  def create_connection: Connection = create_connection_factory.createConnection
+  def queue(value:String) = new ActiveMQQueue(value);
+  def topic(value:String) = new ActiveMQTopic(value);
+
+  def connect(start:Boolean=true) = {
+    val connection = create_connection
+    connections ::= connection
+    if(default_connection==null) {
+      default_connection = connection
+    }
+    if( start ) {
+      connection.start
+    }
+    connection
+  }
+
+  test("Queue order preserved") {x}
+  def x = {
+    println("connecting")
+    connect()
+    println("connectted")
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(queue("example"))
+    def put(id:Int) = {
+      producer.send(session.createTextMessage("message:"+id))
+    }
+
+    List(1,2,3).foreach(put _)
+
+    val consumer = session.createConsumer(queue("example"))
+
+    def get(id:Int) = {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(queue("example"))
+      m.getText should equal ("message:"+id)
+    }
+
+    List(1,2,3).foreach(get _)
+  }
+
+  test("Topic drops messages sent before before subscription is established") {
+    connect()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(topic("example"))
+    def put(id:Int) = {
+      producer.send(session.createTextMessage("message:"+id))
+    }
+
+    put(1)
+
+    val consumer = session.createConsumer(topic("example"))
+
+    put(2)
+    put(3)
+
+    def get(id:Int) = {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(topic("example"))
+      m.getText should equal ("message:"+id)
+    }
+
+    List(2,3).foreach(get _)
+  }
+
+
+  test("Topic /w Durable sub retains messages.") {
+
+    def create_durable_sub() = {
+      val connection = connect(false)
+      connection.setClientID("test")
+      connection.start
+      val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+      val subscriber = session.createDurableSubscriber(topic("example"), "test")
+      session.close
+      connection.close
+      if (default_connection == connection) {
+        default_connection = null
+      }
+    }
+
+    create_durable_sub
+
+    connect(false)
+    default_connection.setClientID("test")
+    default_connection.start
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(topic("example"))
+    def put(id:Int) = {
+      producer.send(session.createTextMessage("message:"+id))
+    }
+
+    List(1,2,3).foreach(put _)
+
+    val subscriber = session.createDurableSubscriber(topic("example"), "test")
+
+    def get(id:Int) = {
+      val m = subscriber.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(topic("example"))
+      m.getText should equal ("message:"+id)
+    }
+
+    List(1,2,3).foreach(get _)
+  }
+
+  test("Queue and a selector") {
+    connect()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(queue("example"))
+
+    def put(id:Int, color:String) = {
+      val message = session.createTextMessage("message:" + id)
+      message.setStringProperty("color", color)
+      producer.send(message)
+    }
+
+    List((1, "red"), (2, "blue"), (3, "red")).foreach {
+      case (id, color) => put(id, color)
+    }
+
+    val consumer = session.createConsumer(queue("example"), "color='red'")
+
+    def get(id:Int) = {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(queue("example"))
+      m.getText should equal ("message:"+id)
+    }
+
+    get(1)
+    get(3)
+  }
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/codec/BooleanStreamTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/codec/BooleanStreamTest.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/codec/BooleanStreamTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/codec/BooleanStreamTest.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.codec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+
+/**
+ */
+public class BooleanStreamTest extends TestCase {
+
+    protected OpenWireFormat openWireformat;
+    protected int endOfStreamMarker = 0x12345678;
+    int numberOfBytes = 8 * 200;
+
+    interface BooleanValueSet {
+        boolean getBooleanValueFor(int index, int count);
+    }
+
+    public void testBooleanMarshallingUsingAllTrue() throws Exception {
+        testBooleanStream(numberOfBytes, new BooleanValueSet() {
+            public boolean getBooleanValueFor(int index, int count) {
+                return true;
+            }
+        });
+    }
+
+    public void testBooleanMarshallingUsingAllFalse() throws Exception {
+        testBooleanStream(numberOfBytes, new BooleanValueSet() {
+            public boolean getBooleanValueFor(int index, int count) {
+                return false;
+            }
+        });
+    }
+
+    public void testBooleanMarshallingUsingOddAlternateTrueFalse() throws Exception {
+        testBooleanStream(numberOfBytes, new BooleanValueSet() {
+            public boolean getBooleanValueFor(int index, int count) {
+                return (index & 1) == 0;
+            }
+        });
+    }
+
+    public void testBooleanMarshallingUsingEvenAlternateTrueFalse() throws Exception {
+        testBooleanStream(numberOfBytes, new BooleanValueSet() {
+            public boolean getBooleanValueFor(int index, int count) {
+                return (index & 1) != 0;
+            }
+        });
+    }
+
+    protected void testBooleanStream(int numberOfBytes, BooleanValueSet valueSet) throws Exception {
+        for (int i = 0; i < numberOfBytes; i++) {
+            try {
+                assertMarshalBooleans(i, valueSet);
+            } catch (Throwable e) {
+                throw (AssertionFailedError)new AssertionFailedError("Iteration failed at: " + i).initCause(e);
+            }
+        }
+    }
+
+    protected void assertMarshalBooleans(int count, BooleanValueSet valueSet) throws Exception {
+        BooleanStream bs = new BooleanStream();
+        for (int i = 0; i < count; i++) {
+            bs.writeBoolean(valueSet.getBooleanValueFor(i, count));
+        }
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        DataOutputStream ds = new DataOutputStream(buffer);
+        bs.marshal(ds);
+        ds.writeInt(endOfStreamMarker);
+
+        // now lets read from the stream
+        ds.close();
+
+        ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray());
+        DataInputStream dis = new DataInputStream(in);
+        bs = new BooleanStream();
+        try {
+            bs.unmarshal(dis);
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail("Failed to unmarshal: " + count + " booleans: " + e);
+        }
+
+        for (int i = 0; i < count; i++) {
+            boolean expected = valueSet.getBooleanValueFor(i, count);
+            // /System.out.println("Unmarshaling value: " + i + " = " + expected
+            // + " out of: " + count);
+
+            try {
+                boolean actual = bs.readBoolean();
+                assertEquals("value of object: " + i + " was: " + actual, expected, actual);
+            } catch (IOException e) {
+                e.printStackTrace();
+                fail("Failed to parse boolean: " + i + " out of: " + count + " due to: " + e);
+            }
+        }
+        int marker = dis.readInt();
+        assertEquals("Marker int when unmarshalling: " + count + " booleans", Integer.toHexString(endOfStreamMarker), Integer.toHexString(marker));
+
+        // lets try read and we should get an exception
+        try {
+            byte value = dis.readByte();
+            fail("Should have reached the end of the stream");
+        } catch (IOException e) {
+            // worked!
+        }
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        openWireformat = createOpenWireFormat();
+    }
+
+    protected OpenWireFormat createOpenWireFormat() {
+        OpenWireFormat wf = new OpenWireFormat();
+        wf.setCacheEnabled(true);
+        wf.setStackTraceEnabled(false);
+        wf.setVersion(1);
+        return wf;
+    }
+
+}

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1097189&r1=1097188&r2=1097189&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Wed Apr 27 17:32:51 2011
@@ -498,6 +498,7 @@
       <modules>
         <module>apollo-cassandra</module>
         <module>apollo-hawtdb</module>
+        <module>apollo-openwire</module>
       </modules>
     </profile>
     



Mime
View raw message