From commits-return-17300-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Tue Nov 22 20:45:26 2011 Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 040399907 for ; Tue, 22 Nov 2011 20:45:26 +0000 (UTC) Received: (qmail 24640 invoked by uid 500); 22 Nov 2011 20:45:25 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 24614 invoked by uid 500); 22 Nov 2011 20:45:25 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 24607 invoked by uid 99); 22 Nov 2011 20:45:25 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Nov 2011 20:45:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Nov 2011 20:45:15 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0425E2388860 for ; Tue, 22 Nov 2011 20:44:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1205157 [1/2] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/State/ test/csharp/Transport/failover/ Date: Tue, 22 Nov 2011 20:44:52 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111122204453.0425E2388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Tue Nov 22 20:44:50 2011 New Revision: 1205157 URL: http://svn.apache.org/viewvc?rev=1205157&view=rev Log: Fix for: https://issues.apache.org/jira/browse/AMQNET-353 Adds additional test case for this. Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs?rev=1205157&r1=1205156&r2=1205157&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs Tue Nov 22 20:44:50 2011 @@ -1,204 +1,200 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Collections.Generic; -using Apache.NMS.ActiveMQ.Commands; -using Apache.NMS.Util; - -namespace Apache.NMS.ActiveMQ.State -{ - public class ConnectionState - { - - ConnectionInfo info; - private readonly AtomicDictionary transactions = new AtomicDictionary(); - private readonly AtomicDictionary sessions = new AtomicDictionary(); - private readonly AtomicCollection tempDestinations = new AtomicCollection(); - private readonly Atomic _shutdown = new Atomic(false); - private bool connectionInterruptProcessingComplete = true; - private readonly Dictionary recoveringPullConsumers = new Dictionary(); - - public ConnectionState(ConnectionInfo info) - { - this.info = info; - // Add the default session id. - addSession(new SessionInfo(info, -1)); - } - - public override String ToString() - { - return info.ToString(); - } - - public void reset(ConnectionInfo info) - { - this.info = info; - transactions.Clear(); - sessions.Clear(); - tempDestinations.Clear(); - _shutdown.Value = false; - } - - public void addTempDestination(DestinationInfo info) - { - checkShutdown(); - tempDestinations.Add(info); - } - - public void removeTempDestination(IDestination destination) - { - for(int i = tempDestinations.Count - 1; i >= 0; i--) - { - DestinationInfo di = tempDestinations[i]; - if(di.Destination.Equals(destination)) - { - tempDestinations.RemoveAt(i); - } - } - } - - public void addTransactionState(TransactionId id) - { - checkShutdown(); - transactions.Add(id, new TransactionState(id)); - } - - public TransactionState this[TransactionId id] - { - get - { - TransactionState state; - - if(transactions.TryGetValue(id, out state)) - { - return state; - } - - return null; - } - } - - public AtomicCollection TransactionStates - { - get - { - return transactions.Values; - } - } - - public SessionState this[SessionId id] - { - get - { - SessionState sessionState; - - if(sessions.TryGetValue(id, out sessionState)) - { - return sessionState; - } - -#if DEBUG - // Useful for dignosing missing session ids - string sessionList = string.Empty; - foreach(SessionId sessionId in sessions.Keys) - { - sessionList += sessionId.ToString() + "\n"; - } - - System.Diagnostics.Debug.Assert(false, - string.Format("Session '{0}' did not exist in the sessions collection.\n\nSessions:-\n{1}", id, sessionList)); -#endif - return null; - } - } - - public TransactionState removeTransactionState(TransactionId id) - { - TransactionState ret = null; - - transactions.TryGetValue(id, out ret); - transactions.Remove(id); - return ret; - } - - public void addSession(SessionInfo info) - { - checkShutdown(); - sessions.Add(info.SessionId, new SessionState(info)); - } - - public SessionState removeSession(SessionId id) - { - SessionState ret = null; - - sessions.TryGetValue(id, out ret); - sessions.Remove(id); - return ret; - } - - public ConnectionInfo Info - { - get { return info; } - } - - public AtomicCollection SessionIds - { - get { return sessions.Keys; } - } - - public AtomicCollection TempDestinations - { - get { return tempDestinations; } - } - - public AtomicCollection SessionStates - { - get { return sessions.Values; } - } - - private void checkShutdown() - { - if(_shutdown.Value) - { - throw new ApplicationException("Disposed"); - } - } - - public Dictionary RecoveringPullConsumers - { - get { return this.recoveringPullConsumers; } - } - - public bool ConnectionInterruptProcessingComplete - { - get { return this.connectionInterruptProcessingComplete; } - set { this.connectionInterruptProcessingComplete = value; } - } - - public void shutdown() - { - if(_shutdown.CompareAndSet(false, true)) - { - foreach(SessionState ss in sessions.Values) - { - ss.shutdown(); - } - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using Apache.NMS.ActiveMQ.Commands; +using Apache.NMS.Util; + +namespace Apache.NMS.ActiveMQ.State +{ + public class ConnectionState + { + private ConnectionInfo info; + private readonly AtomicDictionary transactions = new AtomicDictionary(); + private readonly AtomicDictionary sessions = new AtomicDictionary(); + private readonly AtomicCollection tempDestinations = new AtomicCollection(); + private readonly Atomic _shutdown = new Atomic(false); + private bool connectionInterruptProcessingComplete = true; + private readonly Dictionary recoveringPullConsumers = new Dictionary(); + + public ConnectionState(ConnectionInfo info) + { + this.info = info; + // Add the default session id. + addSession(new SessionInfo(info, -1)); + } + + public override String ToString() + { + return info.ToString(); + } + + public void reset(ConnectionInfo info) + { + this.info = info; + transactions.Clear(); + sessions.Clear(); + tempDestinations.Clear(); + _shutdown.Value = false; + } + + public void addTempDestination(DestinationInfo info) + { + checkShutdown(); + tempDestinations.Add(info); + } + + public void removeTempDestination(IDestination destination) + { + for(int i = tempDestinations.Count - 1; i >= 0; i--) + { + DestinationInfo di = tempDestinations[i]; + if(di.Destination.Equals(destination)) + { + tempDestinations.RemoveAt(i); + } + } + } + + public void addTransactionState(TransactionId id) + { + checkShutdown(); + transactions.Add(id, new TransactionState(id)); + } + + public TransactionState this[TransactionId id] + { + get + { + TransactionState state; + + if(transactions.TryGetValue(id, out state)) + { + return state; + } + + return null; + } + } + + public AtomicCollection TransactionStates + { + get { return transactions.Values; } + } + + public SessionState this[SessionId id] + { + get + { + SessionState sessionState; + + if(sessions.TryGetValue(id, out sessionState)) + { + return sessionState; + } + +#if DEBUG + // Useful for dignosing missing session ids + string sessionList = string.Empty; + foreach(SessionId sessionId in sessions.Keys) + { + sessionList += sessionId.ToString() + "\n"; + } + + System.Diagnostics.Debug.Assert(false, + string.Format("Session '{0}' did not exist in the sessions collection.\n\nSessions:-\n{1}", id, sessionList)); +#endif + return null; + } + } + + public TransactionState removeTransactionState(TransactionId id) + { + TransactionState ret = null; + + transactions.TryGetValue(id, out ret); + transactions.Remove(id); + return ret; + } + + public void addSession(SessionInfo info) + { + checkShutdown(); + sessions.Add(info.SessionId, new SessionState(info)); + } + + public SessionState removeSession(SessionId id) + { + SessionState ret = null; + + sessions.TryGetValue(id, out ret); + sessions.Remove(id); + return ret; + } + + public ConnectionInfo Info + { + get { return info; } + } + + public AtomicCollection SessionIds + { + get { return sessions.Keys; } + } + + public AtomicCollection TempDestinations + { + get { return tempDestinations; } + } + + public AtomicCollection SessionStates + { + get { return sessions.Values; } + } + + private void checkShutdown() + { + if(_shutdown.Value) + { + throw new ApplicationException("Disposed"); + } + } + + public Dictionary RecoveringPullConsumers + { + get { return this.recoveringPullConsumers; } + } + + public bool ConnectionInterruptProcessingComplete + { + get { return this.connectionInterruptProcessingComplete; } + set { this.connectionInterruptProcessingComplete = value; } + } + + public void shutdown() + { + if(_shutdown.CompareAndSet(false, true)) + { + foreach(SessionState ss in sessions.Values) + { + ss.shutdown(); + } + } + } + } +} Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1205157&r1=1205156&r2=1205157&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Tue Nov 22 20:44:50 2011 @@ -1,761 +1,762 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Collections.Generic; -using Apache.NMS.ActiveMQ.Commands; -using Apache.NMS.ActiveMQ.Transport; -using System.Collections; - -namespace Apache.NMS.ActiveMQ.State -{ - /// - /// Tracks the state of a connection so a newly established transport can be - /// re-initialized to the state that was tracked. - /// - public class ConnectionStateTracker : CommandVisitorAdapter - { - private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); - - protected Dictionary connectionStates = new Dictionary(); - - private bool _trackTransactions; - private bool _restoreSessions = true; - private bool _restoreConsumers = true; - private bool _restoreProducers = true; - private bool _restoreTransaction = true; - private bool _trackMessages = true; - private int _maxCacheSize = 256; - private int currentCacheSize; - private readonly Dictionary messageCache = new Dictionary(); - private readonly Queue messageCacheFIFO = new Queue(); - - protected void RemoveEldestInCache() - { - System.Collections.ICollection ic = messageCacheFIFO; - lock(ic.SyncRoot) - { - while(messageCacheFIFO.Count > MaxCacheSize) - { - messageCache.Remove(messageCacheFIFO.Dequeue()); - currentCacheSize = currentCacheSize - 1; - } - } - } - - private class RemoveTransactionAction : ThreadSimulator - { - private readonly TransactionInfo info; - private readonly ConnectionStateTracker cst; - - public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst) - { - this.info = info; - this.cst = aCst; - } - - public override void Run() - { - ConnectionState cs; - - if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs)) - { - cs.removeTransactionState(info.TransactionId); - } - } - } - - /// - /// - /// - /// null if the command is not state tracked. - public Tracked Track(Command command) - { - try - { - return (Tracked) command.visit(this); - } - catch(IOException e) - { - throw e; - } - catch(Exception e) - { - throw new IOException(e.Message); - } - } - - public void TrackBack(Command command) - { - if(TrackMessages && command != null && command.IsMessage) - { - Message message = (Message) command; - if(message.TransactionId == null) - { - currentCacheSize = currentCacheSize + 1; - } - } - } - - public void DoRestore(ITransport transport) - { - // Restore the connections. - foreach(ConnectionState connectionState in connectionStates.Values) - { - ConnectionInfo info = connectionState.Info; - info.FailoverReconnect = true; - transport.Oneway(info); - - DoRestoreTempDestinations(transport, connectionState); - - if(RestoreSessions) - { - DoRestoreSessions(transport, connectionState); - } - - if(RestoreTransaction) - { - DoRestoreTransactions(transport, connectionState); - } - } - //now flush messages - foreach(Message msg in messageCache.Values) - { - transport.Oneway(msg); - } - } - - private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState) - { - AtomicCollection transactionStates = connectionState.TransactionStates; - foreach(TransactionState transactionState in transactionStates) - { - foreach(Command command in transactionState.Commands) - { - transport.Oneway(command); - } - } - } - - /// - /// - /// - /// - protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState) - { - // Restore the connection's sessions - foreach(SessionState sessionState in connectionState.SessionStates) - { - transport.Oneway(sessionState.Info); - - if(RestoreProducers) - { - DoRestoreProducers(transport, sessionState); - } - - if(RestoreConsumers) - { - DoRestoreConsumers(transport, sessionState); - } - } - } - - /// - /// - /// - /// - protected void DoRestoreConsumers(ITransport transport, SessionState sessionState) - { - // Restore the session's consumers but possibly in pull only (prefetch 0 state) till - // recovery completes. - - ConnectionState connectionState = null; - bool connectionInterruptionProcessingComplete = false; - - if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState)) - { - connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete; - } - - // Restore the session's consumers - foreach(ConsumerState consumerState in sessionState.ConsumerStates) - { - ConsumerInfo infoToSend = consumerState.Info; - - if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5) - { - infoToSend = consumerState.Info.Clone() as ConsumerInfo; - lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) - { - if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId)) - { - connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info); - } - } - infoToSend.PrefetchSize = 0; - if(Tracer.IsDebugEnabled) - { - Tracer.Debug("restore consumer: " + infoToSend.ConsumerId + - " in pull mode pending recovery, overriding prefetch: " + - consumerState.Info.PrefetchSize); - } - } - - if(Tracer.IsDebugEnabled) - { - Tracer.Debug("restore consumer: " + infoToSend.ConsumerId); - } - - transport.Oneway(infoToSend); - } - } - - /// - /// - /// - /// - protected void DoRestoreProducers(ITransport transport, SessionState sessionState) - { - // Restore the session's producers - foreach(ProducerState producerState in sessionState.ProducerStates) - { - transport.Oneway(producerState.Info); - } - } - - /// - /// - /// - /// - protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState) - { - // Restore the connection's temp destinations. - foreach(DestinationInfo destinationInfo in connectionState.TempDestinations) - { - transport.Oneway(destinationInfo); - } - } - - public override Response processAddDestination(DestinationInfo info) - { - if(info != null && info.Destination.IsTemporary) - { - ConnectionState cs; - - if(connectionStates.TryGetValue(info.ConnectionId, out cs)) - { - cs.addTempDestination(info); - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveDestination(DestinationInfo info) - { - if(info != null && info.Destination.IsTemporary) - { - ConnectionState cs; - if(connectionStates.TryGetValue(info.ConnectionId, out cs)) - { - cs.removeTempDestination(info.Destination); - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processAddProducer(ProducerInfo info) - { - if(info != null && info.ProducerId != null) - { - SessionId sessionId = info.ProducerId.ParentId; - if(sessionId != null) - { - ConnectionId connectionId = sessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - SessionState ss = cs[sessionId]; - if(ss != null) - { - ss.addProducer(info); - } - } - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveProducer(ProducerId id) - { - if(id != null) - { - SessionId sessionId = id.ParentId; - if(sessionId != null) - { - ConnectionId connectionId = sessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - SessionState ss = cs[sessionId]; - if(ss != null) - { - ss.removeProducer(id); - } - } - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processAddConsumer(ConsumerInfo info) - { - if(info != null) - { - SessionId sessionId = info.ConsumerId.ParentId; - if(sessionId != null) - { - ConnectionId connectionId = sessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - SessionState ss = cs[sessionId]; - if(ss != null) - { - ss.addConsumer(info); - } - } - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveConsumer(ConsumerId id) - { - if(id != null) - { - SessionId sessionId = id.ParentId; - if(sessionId != null) - { - ConnectionId connectionId = sessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - SessionState ss = cs[sessionId]; - if(ss != null) - { - ss.removeConsumer(id); - } - } - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processAddSession(SessionInfo info) - { - if(info != null) - { - ConnectionId connectionId = info.SessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - cs.addSession(info); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveSession(SessionId id) - { - if(id != null) - { - ConnectionId connectionId = id.ParentId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - cs.removeSession(id); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processAddConnection(ConnectionInfo info) - { - if(info != null) - { - connectionStates.Add(info.ConnectionId, new ConnectionState(info)); - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveConnection(ConnectionId id) - { - if(id != null) - { - connectionStates.Remove(id); - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processMessage(Message send) - { - if(send != null) - { - if(TrackTransactions && send.TransactionId != null) - { - ConnectionId connectionId = send.ProducerId.ParentId.ParentId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - TransactionState transactionState = cs[send.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(send); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - else if(TrackMessages) - { - messageCache.Add(send.MessageId, (Message) send.Clone()); - RemoveEldestInCache(); - } - } - return null; - } - - public override Response processMessageAck(MessageAck ack) - { - if(TrackTransactions && ack != null && ack.TransactionId != null) - { - ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - TransactionState transactionState = cs[ack.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(ack); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - return null; - } - - public override Response processBeginTransaction(TransactionInfo info) - { - if(TrackTransactions && info != null && info.TransactionId != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - cs.addTransactionState(info.TransactionId); - TransactionState state = cs[info.TransactionId]; - state.addCommand(info); - } - } - return TRACKED_RESPONSE_MARKER; - } - return null; - } - - public override Response processPrepareTransaction(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - return null; - } - - public override Response processCommitTransactionOnePhase(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - return new Tracked(new RemoveTransactionAction(info, this)); - } - } - } - } - return null; - } - - public override Response processCommitTransactionTwoPhase(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(cs != null) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - return new Tracked(new RemoveTransactionAction(info, this)); - } - } - } - } - return null; - } - - public override Response processRollbackTransaction(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - return new Tracked(new RemoveTransactionAction(info, this)); - } - } - } - } - return null; - } - - public override Response processEndTransaction(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - return null; - } - - public bool RestoreConsumers - { - get - { - return _restoreConsumers; - } - set - { - _restoreConsumers = value; - } - } - - public bool RestoreProducers - { - get - { - return _restoreProducers; - } - set - { - _restoreProducers = value; - } - } - - public bool RestoreSessions - { - get - { - return _restoreSessions; - } - set - { - _restoreSessions = value; - } - } - - public bool TrackTransactions - { - get - { - return _trackTransactions; - } - set - { - _trackTransactions = value; - } - } - - public bool RestoreTransaction - { - get - { - return _restoreTransaction; - } - set - { - _restoreTransaction = value; - } - } - - public bool TrackMessages - { - get - { - return _trackMessages; - } - set - { - _trackMessages = value; - } - } - - public int MaxCacheSize - { - get - { - return _maxCacheSize; - } - set - { - _maxCacheSize = value; - } - } - - public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId) - { - ConnectionState connectionState = null; - - if(connectionStates.TryGetValue(connectionId, out connectionState)) - { - connectionState.ConnectionInterruptProcessingComplete = true; - - lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) - { - foreach(KeyValuePair entry in connectionState.RecoveringPullConsumers) - { - ConsumerControl control = new ConsumerControl(); - control.ConsumerId = entry.Key; - control.Prefetch = entry.Value.PrefetchSize; - control.Destination = entry.Value.Destination; - try - { - if(Tracer.IsDebugEnabled) - { - Tracer.Debug("restored recovering consumer: " + control.ConsumerId + - " with: " + control.Prefetch); - } - transport.Oneway(control); - } - catch(Exception ex) - { - if(Tracer.IsDebugEnabled) - { - Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId + - " with: " + control.Prefetch + "Error: " + ex.Message); - } - } - } - connectionState.RecoveringPullConsumers.Clear(); - } - } - } - - public void TransportInterrupted(ConnectionId id) - { - ConnectionState connection = null; - - if(connectionStates.TryGetValue(id, out connection)) - { - connection.ConnectionInterruptProcessingComplete = false; - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using Apache.NMS.ActiveMQ.Commands; +using Apache.NMS.ActiveMQ.Transport; +using System.Collections; + +namespace Apache.NMS.ActiveMQ.State +{ + /// + /// Tracks the state of a connection so a newly established transport can be + /// re-initialized to the state that was tracked. + /// + public class ConnectionStateTracker : CommandVisitorAdapter + { + private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); + + protected Dictionary connectionStates = new Dictionary(); + + private bool _trackTransactions; + private bool _trackTransactionProducers = true; + private bool _restoreSessions = true; + private bool _restoreConsumers = true; + private bool _restoreProducers = true; + private bool _restoreTransaction = true; + private bool _trackMessages = true; + private int _maxCacheSize = 256; + private int currentCacheSize; + private readonly Dictionary messageCache = new Dictionary(); + private readonly Queue messageCacheFIFO = new Queue(); + + protected void RemoveEldestInCache() + { + System.Collections.ICollection ic = messageCacheFIFO; + lock(ic.SyncRoot) + { + while(messageCacheFIFO.Count > MaxCacheSize) + { + messageCache.Remove(messageCacheFIFO.Dequeue()); + currentCacheSize = currentCacheSize - 1; + } + } + } + + private class RemoveTransactionAction : ThreadSimulator + { + private readonly TransactionInfo info; + private readonly ConnectionStateTracker cst; + + public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst) + { + this.info = info; + this.cst = aCst; + } + + public override void Run() + { + ConnectionState cs; + + if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs)) + { + cs.removeTransactionState(info.TransactionId); + } + } + } + + /// + /// + /// + /// null if the command is not state tracked. + public Tracked Track(Command command) + { + try + { + return (Tracked) command.visit(this); + } + catch(IOException e) + { + throw e; + } + catch(Exception e) + { + throw new IOException(e.Message); + } + } + + public void TrackBack(Command command) + { + if(TrackMessages && command != null && command.IsMessage) + { + Message message = (Message) command; + if(message.TransactionId == null) + { + currentCacheSize = currentCacheSize + 1; + } + } + } + + public void DoRestore(ITransport transport) + { + // Restore the connections. + foreach(ConnectionState connectionState in connectionStates.Values) + { + ConnectionInfo info = connectionState.Info; + info.FailoverReconnect = true; + transport.Oneway(info); + + DoRestoreTempDestinations(transport, connectionState); + + if(RestoreSessions) + { + DoRestoreSessions(transport, connectionState); + } + + if(RestoreTransaction) + { + DoRestoreTransactions(transport, connectionState); + } + } + //now flush messages + foreach(Message msg in messageCache.Values) + { + transport.Oneway(msg); + } + } + + private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState) + { + AtomicCollection transactionStates = connectionState.TransactionStates; + + foreach(TransactionState transactionState in transactionStates) + { + // replay the add and remove of short lived producers that may have been + // involved in the transaction + foreach (ProducerState producerState in transactionState.ProducerStates) + { + if (Tracer.IsDebugEnabled) + { + Tracer.Debug("tx replay producer :" + producerState.Info); + } + transport.Oneway(producerState.Info); + } + + foreach (Command command in transactionState.Commands) + { + if (Tracer.IsDebugEnabled) + { + Tracer.Debug("tx replay: " + command); + } + transport.Oneway(command); + } + + foreach (ProducerState producerState in transactionState.ProducerStates) + { + if (Tracer.IsDebugEnabled) + { + Tracer.Debug("tx remove replayed producer :" + producerState.Info); + } + + RemoveInfo producerRemove = new RemoveInfo(); + producerRemove.ObjectId = producerState.Info.ProducerId; + transport.Oneway(producerRemove); + } + } + } + + /// + /// + /// + /// + protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState) + { + // Restore the connection's sessions + foreach(SessionState sessionState in connectionState.SessionStates) + { + transport.Oneway(sessionState.Info); + + if(RestoreProducers) + { + DoRestoreProducers(transport, sessionState); + } + + if(RestoreConsumers) + { + DoRestoreConsumers(transport, sessionState); + } + } + } + + /// + /// + /// + /// + protected void DoRestoreConsumers(ITransport transport, SessionState sessionState) + { + // Restore the session's consumers but possibly in pull only (prefetch 0 state) till + // recovery completes. + + ConnectionState connectionState = null; + bool connectionInterruptionProcessingComplete = false; + + if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState)) + { + connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete; + } + + // Restore the session's consumers + foreach(ConsumerState consumerState in sessionState.ConsumerStates) + { + ConsumerInfo infoToSend = consumerState.Info; + + if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5) + { + infoToSend = consumerState.Info.Clone() as ConsumerInfo; + lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) + { + if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId)) + { + connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info); + } + } + infoToSend.PrefetchSize = 0; + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("restore consumer: " + infoToSend.ConsumerId + + " in pull mode pending recovery, overriding prefetch: " + + consumerState.Info.PrefetchSize); + } + } + + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("restore consumer: " + infoToSend.ConsumerId); + } + + transport.Oneway(infoToSend); + } + } + + /// + /// + /// + /// + protected void DoRestoreProducers(ITransport transport, SessionState sessionState) + { + // Restore the session's producers + foreach(ProducerState producerState in sessionState.ProducerStates) + { + transport.Oneway(producerState.Info); + } + } + + /// + /// + /// + /// + protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState) + { + // Restore the connection's temp destinations. + foreach(DestinationInfo destinationInfo in connectionState.TempDestinations) + { + transport.Oneway(destinationInfo); + } + } + + public override Response processAddDestination(DestinationInfo info) + { + if(info != null && info.Destination.IsTemporary) + { + ConnectionState cs; + + if(connectionStates.TryGetValue(info.ConnectionId, out cs)) + { + cs.addTempDestination(info); + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveDestination(DestinationInfo info) + { + if(info != null && info.Destination.IsTemporary) + { + ConnectionState cs; + if(connectionStates.TryGetValue(info.ConnectionId, out cs)) + { + cs.removeTempDestination(info.Destination); + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processAddProducer(ProducerInfo info) + { + if(info != null && info.ProducerId != null) + { + SessionId sessionId = info.ProducerId.ParentId; + if(sessionId != null) + { + ConnectionId connectionId = sessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + SessionState ss = cs[sessionId]; + if(ss != null) + { + ss.addProducer(info); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveProducer(ProducerId id) + { + if(id != null) + { + SessionId sessionId = id.ParentId; + if(sessionId != null) + { + ConnectionId connectionId = sessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + SessionState ss = cs[sessionId]; + if(ss != null) + { + ss.removeProducer(id); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processAddConsumer(ConsumerInfo info) + { + if(info != null) + { + SessionId sessionId = info.ConsumerId.ParentId; + if(sessionId != null) + { + ConnectionId connectionId = sessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + SessionState ss = cs[sessionId]; + if(ss != null) + { + ss.addConsumer(info); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveConsumer(ConsumerId id) + { + if(id != null) + { + SessionId sessionId = id.ParentId; + if(sessionId != null) + { + ConnectionId connectionId = sessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + SessionState ss = cs[sessionId]; + if(ss != null) + { + ss.removeConsumer(id); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processAddSession(SessionInfo info) + { + if(info != null) + { + ConnectionId connectionId = info.SessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + cs.addSession(info); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveSession(SessionId id) + { + if(id != null) + { + ConnectionId connectionId = id.ParentId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + cs.removeSession(id); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processAddConnection(ConnectionInfo info) + { + if(info != null) + { + connectionStates.Add(info.ConnectionId, new ConnectionState(info)); + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveConnection(ConnectionId id) + { + if(id != null) + { + connectionStates.Remove(id); + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processMessage(Message send) + { + if(send != null) + { + if(TrackTransactions && send.TransactionId != null) + { + ProducerId producerId = send.ProducerId; + ConnectionId connectionId = producerId.ParentId.ParentId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + TransactionState transactionState = cs[send.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(send); + + if (_trackTransactionProducers) + { + SessionState ss = cs[producerId.ParentId]; + ProducerState producerState = ss[producerId]; + producerState.TransactionState = transactionState; + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + else if(TrackMessages) + { + messageCache.Add(send.MessageId, (Message) send.Clone()); + RemoveEldestInCache(); + } + } + return null; + } + + public override Response processMessageAck(MessageAck ack) + { + if(TrackTransactions && ack != null && ack.TransactionId != null) + { + ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + TransactionState transactionState = cs[ack.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(ack); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public override Response processBeginTransaction(TransactionInfo info) + { + if(TrackTransactions && info != null && info.TransactionId != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + cs.addTransactionState(info.TransactionId); + TransactionState state = cs[info.TransactionId]; + state.addCommand(info); + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public override Response processPrepareTransaction(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public override Response processCommitTransactionOnePhase(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info, this)); + } + } + } + } + return null; + } + + public override Response processCommitTransactionTwoPhase(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(cs != null) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info, this)); + } + } + } + } + return null; + } + + public override Response processRollbackTransaction(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info, this)); + } + } + } + } + return null; + } + + public override Response processEndTransaction(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public bool RestoreConsumers + { + get { return _restoreConsumers; } + set { _restoreConsumers = value; } + } + + public bool RestoreProducers + { + get { return _restoreProducers; } + set { _restoreProducers = value; } + } + + public bool RestoreSessions + { + get { return _restoreSessions; } + set { _restoreSessions = value; } + } + + public bool TrackTransactions + { + get { return _trackTransactions; } + set { _trackTransactions = value; } + } + + public bool TrackTransactionProducers + { + get { return _trackTransactionProducers; } + set { _trackTransactionProducers = value; } + } + + public bool RestoreTransaction + { + get { return _restoreTransaction; } + set { _restoreTransaction = value; } + } + + public bool TrackMessages + { + get { return _trackMessages; } + set { _trackMessages = value; } + } + + public int MaxCacheSize + { + get { return _maxCacheSize; } + set { _maxCacheSize = value; } + } + + public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId) + { + ConnectionState connectionState = null; + + if(connectionStates.TryGetValue(connectionId, out connectionState)) + { + connectionState.ConnectionInterruptProcessingComplete = true; + + lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) + { + foreach(KeyValuePair entry in connectionState.RecoveringPullConsumers) + { + ConsumerControl control = new ConsumerControl(); + control.ConsumerId = entry.Key; + control.Prefetch = entry.Value.PrefetchSize; + control.Destination = entry.Value.Destination; + try + { + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("restored recovering consumer: " + control.ConsumerId + + " with: " + control.Prefetch); + } + transport.Oneway(control); + } + catch(Exception ex) + { + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId + + " with: " + control.Prefetch + "Error: " + ex.Message); + } + } + } + connectionState.RecoveringPullConsumers.Clear(); + } + } + } + + public void TransportInterrupted(ConnectionId id) + { + ConnectionState connection = null; + + if(connectionStates.TryGetValue(id, out connection)) + { + connection.ConnectionInterruptProcessingComplete = false; + } + } + } +} Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs?rev=1205157&r1=1205156&r2=1205157&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs Tue Nov 22 20:44:50 2011 @@ -1,47 +1,43 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; - -using Apache.NMS.ActiveMQ.Commands; - -namespace Apache.NMS.ActiveMQ.State -{ - - public class ConsumerState - { - readonly ConsumerInfo info; - - public ConsumerState(ConsumerInfo info) - { - this.info = info; - } - - public override String ToString() - { - return info.ToString(); - } - - public ConsumerInfo Info - { - get - { - return info; - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +using Apache.NMS.ActiveMQ.Commands; + +namespace Apache.NMS.ActiveMQ.State +{ + public class ConsumerState + { + private readonly ConsumerInfo info; + + public ConsumerState(ConsumerInfo info) + { + this.info = info; + } + + public override String ToString() + { + return info.ToString(); + } + + public ConsumerInfo Info + { + get { return info; } + } + } +} Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs?rev=1205157&r1=1205156&r2=1205157&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs Tue Nov 22 20:44:50 2011 @@ -1,46 +1,50 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; - -using Apache.NMS.ActiveMQ.Commands; - -namespace Apache.NMS.ActiveMQ.State -{ - public class ProducerState - { - readonly ProducerInfo info; - - public ProducerState(ProducerInfo info) - { - this.info = info; - } - - public override String ToString() - { - return info.ToString(); - } - - public ProducerInfo Info - { - get - { - return info; - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +using Apache.NMS.ActiveMQ.Commands; + +namespace Apache.NMS.ActiveMQ.State +{ + public class ProducerState + { + private readonly ProducerInfo info; + private TransactionState transactionState; + + public ProducerState(ProducerInfo info) + { + this.info = info; + } + + public override String ToString() + { + return info.ToString(); + } + + public ProducerInfo Info + { + get { return info; } + } + + public TransactionState TransactionState + { + get { return this.transactionState; } + set { this.transactionState = value; } + } + } +} Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs?rev=1205157&r1=1205156&r2=1205157&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs Tue Nov 22 20:44:50 2011 @@ -1,148 +1,132 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using Apache.NMS.ActiveMQ.Commands; -using Apache.NMS.Util; - -namespace Apache.NMS.ActiveMQ.State -{ - public class SessionState - { - readonly SessionInfo info; - - private readonly AtomicDictionary producers = new AtomicDictionary(); - private readonly AtomicDictionary consumers = new AtomicDictionary(); - private readonly Atomic _shutdown = new Atomic(false); - - public SessionState(SessionInfo info) - { - this.info = info; - } - - public override String ToString() - { - return info.ToString(); - } - - public void addProducer(ProducerInfo info) - { - checkShutdown(); - producers.Add(info.ProducerId, new ProducerState(info)); - } - - public ProducerState removeProducer(ProducerId id) - { - ProducerState ret = producers[id]; - producers.Remove(id); - return ret; - } - - public void addConsumer(ConsumerInfo info) - { - checkShutdown(); - consumers.Add(info.ConsumerId, new ConsumerState(info)); - } - - public ConsumerState removeConsumer(ConsumerId id) - { - ConsumerState ret = consumers[id]; - consumers.Remove(id); - return ret; - } - - public SessionInfo Info - { - get - { - return info; - } - } - - public AtomicCollection ConsumerIds - { - get - { - return consumers.Keys; - } - } - - public AtomicCollection ProducerIds - { - get - { - return producers.Keys; - } - } - - public AtomicCollection ProducerStates - { - get - { - return producers.Values; - } - } - - public ProducerState getProducerState(ProducerId producerId) - { - return producers[producerId]; - } - - public ProducerState this[ProducerId producerId] - { - get - { - return producers[producerId]; - } - } - - public AtomicCollection ConsumerStates - { - get - { - return consumers.Values; - } - } - - public ConsumerState getConsumerState(ConsumerId consumerId) - { - return consumers[consumerId]; - } - - public ConsumerState this[ConsumerId consumerId] - { - get - { - return consumers[consumerId]; - } - } - - private void checkShutdown() - { - if(_shutdown.Value) - { - throw new ApplicationException("Disposed"); - } - } - - public void shutdown() - { - _shutdown.Value = true; - } - - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Apache.NMS.ActiveMQ.Commands; +using Apache.NMS.Util; + +namespace Apache.NMS.ActiveMQ.State +{ + public class SessionState + { + readonly SessionInfo info; + + private readonly AtomicDictionary producers = new AtomicDictionary(); + private readonly AtomicDictionary consumers = new AtomicDictionary(); + private readonly Atomic _shutdown = new Atomic(false); + + public SessionState(SessionInfo info) + { + this.info = info; + } + + public override String ToString() + { + return info.ToString(); + } + + public void addProducer(ProducerInfo info) + { + checkShutdown(); + producers.Add(info.ProducerId, new ProducerState(info)); + } + + public ProducerState removeProducer(ProducerId id) + { + ProducerState ret = producers[id]; + producers.Remove(id); + if (ret.TransactionState != null) + { + ret.TransactionState.AddProducer(ret); + } + + return ret; + } + + public void addConsumer(ConsumerInfo info) + { + checkShutdown(); + consumers.Add(info.ConsumerId, new ConsumerState(info)); + } + + public ConsumerState removeConsumer(ConsumerId id) + { + ConsumerState ret = consumers[id]; + consumers.Remove(id); + return ret; + } + + public SessionInfo Info + { + get { return info; } + } + + public AtomicCollection ConsumerIds + { + get { return consumers.Keys; } + } + + public AtomicCollection ProducerIds + { + get { return producers.Keys; } + } + + public AtomicCollection ProducerStates + { + get { return producers.Values; } + } + + public ProducerState getProducerState(ProducerId producerId) + { + return producers[producerId]; + } + + public ProducerState this[ProducerId producerId] + { + get { return producers[producerId]; } + } + + public AtomicCollection ConsumerStates + { + get { return consumers.Values; } + } + + public ConsumerState getConsumerState(ConsumerId consumerId) + { + return consumers[consumerId]; + } + + public ConsumerState this[ConsumerId consumerId] + { + get { return consumers[consumerId]; } + } + + private void checkShutdown() + { + if(_shutdown.Value) + { + throw new ApplicationException("Disposed"); + } + } + + public void shutdown() + { + _shutdown.Value = true; + } + + } +} Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs?rev=1205157&r1=1205156&r2=1205157&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs Tue Nov 22 20:44:50 2011 @@ -1,226 +1,222 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System.Collections; -using System.Collections.Generic; - -namespace Apache.NMS.ActiveMQ.State -{ - public class AtomicCollection - where TValue : class - { - private readonly ArrayList _collection = new ArrayList(); - - public AtomicCollection() - { - } - - public AtomicCollection(ICollection c) - { - lock(c.SyncRoot) - { - foreach(object obj in c) - { - _collection.Add(obj); - } - } - } - - public int Count - { - get - { - lock(_collection.SyncRoot) - { - return _collection.Count; - } - } - } - - public bool IsReadOnly - { - get - { - return false; - } - } - - public int Add(TValue v) - { - lock(_collection.SyncRoot) - { - return _collection.Add(v); - } - } - - public void Clear() - { - lock(_collection.SyncRoot) - { - _collection.Clear(); - } - } - - public bool Contains(TValue v) - { - lock(_collection.SyncRoot) - { - return _collection.Contains(v); - } - } - - public void CopyTo(TValue[] a, int index) - { - lock(_collection.SyncRoot) - { - _collection.CopyTo(a, index); - } - } - - public void Remove(TValue v) - { - lock(_collection.SyncRoot) - { - _collection.Remove(v); - } - } - - public void RemoveAt(int index) - { - lock(_collection.SyncRoot) - { - _collection.RemoveAt(index); - } - } - - public TValue this[int index] - { - get - { - TValue ret; - lock(_collection.SyncRoot) - { - ret = (TValue) _collection[index]; - } - return (TValue) ret; - } - set - { - lock(_collection.SyncRoot) - { - _collection[index] = value; - } - } - } - - public IEnumerator GetEnumerator() - { - lock(_collection.SyncRoot) - { - return _collection.GetEnumerator(); - } - } - -#if !NETCF - public IEnumerator GetEnumerator(int index, int count) - { - lock(_collection.SyncRoot) - { - return _collection.GetEnumerator(index, count); - } - } -#endif - } - - public class AtomicDictionary - where TKey : class - where TValue : class - { - private readonly Dictionary _dictionary = new Dictionary(); - - public void Clear() - { - _dictionary.Clear(); - } - - public TValue this[TKey key] - { - get - { - TValue ret; - lock(((ICollection) _dictionary).SyncRoot) - { - ret = _dictionary[key]; - } - return ret; - } - set - { - lock(((ICollection) _dictionary).SyncRoot) - { - _dictionary[key] = value; - } - } - } - - public bool TryGetValue(TKey key, out TValue val) - { - lock(((ICollection) _dictionary).SyncRoot) - { - return _dictionary.TryGetValue(key, out val); - } - } - - public AtomicCollection Keys - { - get - { - lock(((ICollection) _dictionary).SyncRoot) - { - return new AtomicCollection(_dictionary.Keys); - } - } - } - - public AtomicCollection Values - { - get - { - lock(((ICollection) _dictionary).SyncRoot) - { - return new AtomicCollection(_dictionary.Values); - } - } - } - - public void Add(TKey k, TValue v) - { - lock(((ICollection) _dictionary).SyncRoot) - { - _dictionary.Add(k, v); - } - } - - public bool Remove(TKey v) - { - lock(((ICollection) _dictionary).SyncRoot) - { - return _dictionary.Remove(v); - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Collections; +using System.Collections.Generic; + +namespace Apache.NMS.ActiveMQ.State +{ + public class AtomicCollection where TValue : class + { + private readonly ArrayList _collection = new ArrayList(); + + public AtomicCollection() + { + } + + public AtomicCollection(ICollection c) + { + lock(c.SyncRoot) + { + foreach(object obj in c) + { + _collection.Add(obj); + } + } + } + + public int Count + { + get + { + lock(_collection.SyncRoot) + { + return _collection.Count; + } + } + } + + public bool IsReadOnly + { + get { return false; } + } + + public int Add(TValue v) + { + lock(_collection.SyncRoot) + { + return _collection.Add(v); + } + } + + public void Clear() + { + lock(_collection.SyncRoot) + { + _collection.Clear(); + } + } + + public bool Contains(TValue v) + { + lock(_collection.SyncRoot) + { + return _collection.Contains(v); + } + } + + public void CopyTo(TValue[] a, int index) + { + lock(_collection.SyncRoot) + { + _collection.CopyTo(a, index); + } + } + + public void Remove(TValue v) + { + lock(_collection.SyncRoot) + { + _collection.Remove(v); + } + } + + public void RemoveAt(int index) + { + lock(_collection.SyncRoot) + { + _collection.RemoveAt(index); + } + } + + public TValue this[int index] + { + get + { + TValue ret; + lock(_collection.SyncRoot) + { + ret = (TValue) _collection[index]; + } + return (TValue) ret; + } + set + { + lock(_collection.SyncRoot) + { + _collection[index] = value; + } + } + } + + public IEnumerator GetEnumerator() + { + lock(_collection.SyncRoot) + { + return _collection.GetEnumerator(); + } + } + +#if !NETCF + public IEnumerator GetEnumerator(int index, int count) + { + lock(_collection.SyncRoot) + { + return _collection.GetEnumerator(index, count); + } + } +#endif + } + + public class AtomicDictionary + where TKey : class + where TValue : class + { + private readonly Dictionary _dictionary = new Dictionary(); + + public void Clear() + { + _dictionary.Clear(); + } + + public TValue this[TKey key] + { + get + { + TValue ret; + lock(((ICollection) _dictionary).SyncRoot) + { + ret = _dictionary[key]; + } + return ret; + } + set + { + lock(((ICollection) _dictionary).SyncRoot) + { + _dictionary[key] = value; + } + } + } + + public bool TryGetValue(TKey key, out TValue val) + { + lock(((ICollection) _dictionary).SyncRoot) + { + return _dictionary.TryGetValue(key, out val); + } + } + + public AtomicCollection Keys + { + get + { + lock(((ICollection) _dictionary).SyncRoot) + { + return new AtomicCollection(_dictionary.Keys); + } + } + } + + public AtomicCollection Values + { + get + { + lock(((ICollection) _dictionary).SyncRoot) + { + return new AtomicCollection(_dictionary.Values); + } + } + } + + public void Add(TKey k, TValue v) + { + lock(((ICollection) _dictionary).SyncRoot) + { + _dictionary.Add(k, v); + } + } + + public bool Remove(TKey v) + { + lock(((ICollection) _dictionary).SyncRoot) + { + return _dictionary.Remove(v); + } + } + } +}