Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 16657 invoked from network); 8 Jul 2010 23:10:23 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 8 Jul 2010 23:10:23 -0000 Received: (qmail 46203 invoked by uid 500); 8 Jul 2010 23:10:23 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 46154 invoked by uid 500); 8 Jul 2010 23:10:23 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 46146 invoked by uid 99); 8 Jul 2010 23:10:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jul 2010 23:10:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jul 2010 23:10:17 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6382B2388999; Thu, 8 Jul 2010 23:09:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961976 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Commands/ConnectionInfo.cs Connection.cs OpenWire/V6/ConnectionInfoMarshaller.cs State/ConnectionStateTracker.cs Date: Thu, 08 Jul 2010 23:09:23 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100708230923.6382B2388999@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Thu Jul 8 23:09:22 2010 New Revision: 961976 URL: http://svn.apache.org/viewvc?rev=961976&view=rev Log: Update Commands and set Properties correctly. Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs?rev=961976&r1=961975&r2=961976&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs Thu Jul 8 23:09:22 2010 @@ -44,6 +44,7 @@ namespace Apache.NMS.ActiveMQ.Commands bool manageable; bool clientMaster; bool faultTolerant; + bool failoverReconnect; /// /// @@ -75,7 +76,8 @@ namespace Apache.NMS.ActiveMQ.Commands "BrokerMasterConnector = " + BrokerMasterConnector + ", " + "Manageable = " + Manageable + ", " + "ClientMaster = " + ClientMaster + ", " + - "FaultTolerant = " + FaultTolerant + " ]"; + "FaultTolerant = " + FaultTolerant + ", " + + "FailoverReconnect = " + FailoverReconnect + " ]"; } public ConnectionId ConnectionId @@ -132,6 +134,12 @@ namespace Apache.NMS.ActiveMQ.Commands set { this.faultTolerant = value; } } + public bool FailoverReconnect + { + get { return failoverReconnect; } + set { this.failoverReconnect = value; } + } + /// /// /// Return an answer of true to the isConnectionInfo() query. Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=961976&r1=961975&r2=961976&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Thu Jul 8 23:09:22 2010 @@ -87,6 +87,7 @@ namespace Apache.NMS.ActiveMQ this.info = new ConnectionInfo(); this.info.ConnectionId = id; + this.info.FaultTolerant = transport.IsFaultTolerant; } ~Connection() @@ -930,7 +931,7 @@ namespace Apache.NMS.ActiveMQ } } } - + private void SignalInterruptionProcessingComplete() { CountDownLatch cdl = this.transportInterruptionProcessingComplete; @@ -952,7 +953,7 @@ namespace Apache.NMS.ActiveMQ ") of interruption completion for: " + this.info.ConnectionId); } } - + } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs?rev=961976&r1=961975&r2=961976&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs Thu Jul 8 23:09:22 2010 @@ -84,6 +84,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V info.Manageable = bs.ReadBoolean(); info.ClientMaster = bs.ReadBoolean(); info.FaultTolerant = bs.ReadBoolean(); + info.FailoverReconnect = bs.ReadBoolean(); } // @@ -103,6 +104,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V bs.WriteBoolean(info.Manageable); bs.WriteBoolean(info.ClientMaster); bs.WriteBoolean(info.FaultTolerant); + bs.WriteBoolean(info.FailoverReconnect); return rc + 0; } @@ -124,6 +126,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V bs.ReadBoolean(); bs.ReadBoolean(); bs.ReadBoolean(); + bs.ReadBoolean(); } // @@ -154,6 +157,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V info.Manageable = dataIn.ReadBoolean(); info.ClientMaster = dataIn.ReadBoolean(); info.FaultTolerant = dataIn.ReadBoolean(); + info.FailoverReconnect = dataIn.ReadBoolean(); } // @@ -174,6 +178,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V dataOut.Write(info.Manageable); dataOut.Write(info.ClientMaster); dataOut.Write(info.FaultTolerant); + dataOut.Write(info.FailoverReconnect); } } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=961976&r1=961975&r2=961976&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Thu Jul 8 23:09:22 2010 @@ -24,168 +24,171 @@ using Apache.NMS.ActiveMQ.Transport; namespace Apache.NMS.ActiveMQ.State { - /// - /// Tracks the state of a connection so a newly established transport can be - /// re-initialized to the state that was tracked. - /// - public class ConnectionStateTracker : CommandVisitorAdapter - { - - private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); - - protected Dictionary connectionStates = new Dictionary(); - - private bool _trackTransactions; - private bool _restoreSessions = true; - private bool _restoreConsumers = true; - private bool _restoreProducers = true; - private bool _restoreTransaction = true; - private bool _trackMessages = true; - private int _maxCacheSize = 256; - private int currentCacheSize; - private Dictionary messageCache = new Dictionary(); - private Queue messageCacheFIFO = new Queue(); - - protected void RemoveEldestInCache() - { - System.Collections.ICollection ic = messageCacheFIFO; - lock(ic.SyncRoot) - { - while(messageCacheFIFO.Count > MaxCacheSize) - { - messageCache.Remove(messageCacheFIFO.Dequeue()); - currentCacheSize = currentCacheSize - 1; - } - } - } - - private class RemoveTransactionAction : ThreadSimulator - { - private TransactionInfo info; - private ConnectionStateTracker cst; - - public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst) - { - this.info = info; - this.cst = aCst; - } - - public override void Run() - { - ConnectionId connectionId = info.ConnectionId; - ConnectionState cs = cst.connectionStates[connectionId]; - cs.removeTransactionState(info.TransactionId); - } - } - - /// - /// - /// - /// null if the command is not state tracked. - public Tracked track(Command command) - { - try - { - return (Tracked) command.visit(this); - } - catch(IOException e) - { - throw e; - } - catch(Exception e) - { - throw new IOException(e.Message); - } - } - - public void trackBack(Command command) - { - if(TrackMessages && command != null && command.IsMessage) - { - Message message = (Message) command; - if(message.TransactionId == null) - { - currentCacheSize = currentCacheSize + 1; - } - } - } - - public void DoRestore(ITransport transport) - { - // Restore the connections. - foreach(ConnectionState connectionState in connectionStates.Values) - { - transport.Oneway(connectionState.Info); - DoRestoreTempDestinations(transport, connectionState); - - if(RestoreSessions) - { - DoRestoreSessions(transport, connectionState); - } - - if(RestoreTransaction) - { - DoRestoreTransactions(transport, connectionState); - } - } - //now flush messages - foreach(Message msg in messageCache.Values) - { - transport.Oneway(msg); - } - } - - private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState) - { - AtomicCollection transactionStates = connectionState.TransactionStates; - foreach(TransactionState transactionState in transactionStates) - { - foreach(Command command in transactionState.Commands) - { - transport.Oneway(command); - } - } - } - - /// - /// - /// - /// - protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState) - { - // Restore the connection's sessions - foreach(SessionState sessionState in connectionState.SessionStates) - { - transport.Oneway(sessionState.Info); - - if(RestoreProducers) - { - DoRestoreProducers(transport, sessionState); - } - - if(RestoreConsumers) - { - DoRestoreConsumers(transport, sessionState); - } - } - } - - /// - /// - /// - /// - protected void DoRestoreConsumers(ITransport transport, SessionState sessionState) - { + /// + /// Tracks the state of a connection so a newly established transport can be + /// re-initialized to the state that was tracked. + /// + public class ConnectionStateTracker : CommandVisitorAdapter + { + + private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); + + protected Dictionary connectionStates = new Dictionary(); + + private bool _trackTransactions; + private bool _restoreSessions = true; + private bool _restoreConsumers = true; + private bool _restoreProducers = true; + private bool _restoreTransaction = true; + private bool _trackMessages = true; + private int _maxCacheSize = 256; + private int currentCacheSize; + private Dictionary messageCache = new Dictionary(); + private Queue messageCacheFIFO = new Queue(); + + protected void RemoveEldestInCache() + { + System.Collections.ICollection ic = messageCacheFIFO; + lock(ic.SyncRoot) + { + while(messageCacheFIFO.Count > MaxCacheSize) + { + messageCache.Remove(messageCacheFIFO.Dequeue()); + currentCacheSize = currentCacheSize - 1; + } + } + } + + private class RemoveTransactionAction : ThreadSimulator + { + private TransactionInfo info; + private ConnectionStateTracker cst; + + public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst) + { + this.info = info; + this.cst = aCst; + } + + public override void Run() + { + ConnectionId connectionId = info.ConnectionId; + ConnectionState cs = cst.connectionStates[connectionId]; + cs.removeTransactionState(info.TransactionId); + } + } + + /// + /// + /// + /// null if the command is not state tracked. + public Tracked track(Command command) + { + try + { + return (Tracked) command.visit(this); + } + catch(IOException e) + { + throw e; + } + catch(Exception e) + { + throw new IOException(e.Message); + } + } + + public void trackBack(Command command) + { + if(TrackMessages && command != null && command.IsMessage) + { + Message message = (Message) command; + if(message.TransactionId == null) + { + currentCacheSize = currentCacheSize + 1; + } + } + } + + public void DoRestore(ITransport transport) + { + // Restore the connections. + foreach(ConnectionState connectionState in connectionStates.Values) + { + ConnectionInfo info = connectionState.Info; + info.FailoverReconnect = true; + transport.Oneway(info); + + DoRestoreTempDestinations(transport, connectionState); + + if(RestoreSessions) + { + DoRestoreSessions(transport, connectionState); + } + + if(RestoreTransaction) + { + DoRestoreTransactions(transport, connectionState); + } + } + //now flush messages + foreach(Message msg in messageCache.Values) + { + transport.Oneway(msg); + } + } + + private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState) + { + AtomicCollection transactionStates = connectionState.TransactionStates; + foreach(TransactionState transactionState in transactionStates) + { + foreach(Command command in transactionState.Commands) + { + transport.Oneway(command); + } + } + } + + /// + /// + /// + /// + protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState) + { + // Restore the connection's sessions + foreach(SessionState sessionState in connectionState.SessionStates) + { + transport.Oneway(sessionState.Info); + + if(RestoreProducers) + { + DoRestoreProducers(transport, sessionState); + } + + if(RestoreConsumers) + { + DoRestoreConsumers(transport, sessionState); + } + } + } + + /// + /// + /// + /// + protected void DoRestoreConsumers(ITransport transport, SessionState sessionState) + { // Restore the session's consumers but possibly in pull only (prefetch 0 state) till // recovery completes. - ConnectionState connectionState = connectionStates[sessionState.Info.SessionId.ParentId]; - bool connectionInterruptionProcessingComplete = + ConnectionState connectionState = connectionStates[sessionState.Info.SessionId.ParentId]; + bool connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete; - // Restore the session's consumers - foreach(ConsumerState consumerState in sessionState.ConsumerStates) - { + // Restore the session's consumers + foreach(ConsumerState consumerState in sessionState.ConsumerStates) + { ConsumerInfo infoToSend = consumerState.Info; if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0) @@ -207,521 +210,521 @@ namespace Apache.NMS.ActiveMQ.State } transport.Oneway(infoToSend); - } - } + } + } + + /// + /// + /// + /// + protected void DoRestoreProducers(ITransport transport, SessionState sessionState) + { + // Restore the session's producers + foreach(ProducerState producerState in sessionState.ProducerStates) + { + transport.Oneway(producerState.Info); + } + } + + /// + /// + /// + /// + protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState) + { + // Restore the connection's temp destinations. + foreach(DestinationInfo destinationInfo in connectionState.TempDestinations) + { + transport.Oneway(destinationInfo); + } + } + + public override Response processAddDestination(DestinationInfo info) + { + if(info != null) + { + ConnectionState cs = connectionStates[info.ConnectionId]; + if(cs != null && info.Destination.IsTemporary) + { + cs.addTempDestination(info); + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveDestination(DestinationInfo info) + { + if(info != null) + { + ConnectionState cs = connectionStates[info.ConnectionId]; + if(cs != null && info.Destination.IsTemporary) + { + cs.removeTempDestination(info.Destination); + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processAddProducer(ProducerInfo info) + { + if(info != null && info.ProducerId != null) + { + SessionId sessionId = info.ProducerId.ParentId; + if(sessionId != null) + { + ConnectionId connectionId = sessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + SessionState ss = cs[sessionId]; + if(ss != null) + { + ss.addProducer(info); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveProducer(ProducerId id) + { + if(id != null) + { + SessionId sessionId = id.ParentId; + if(sessionId != null) + { + ConnectionId connectionId = sessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + SessionState ss = cs[sessionId]; + if(ss != null) + { + ss.removeProducer(id); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processAddConsumer(ConsumerInfo info) + { + if(info != null) + { + SessionId sessionId = info.ConsumerId.ParentId; + if(sessionId != null) + { + ConnectionId connectionId = sessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + SessionState ss = cs[sessionId]; + if(ss != null) + { + ss.addConsumer(info); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveConsumer(ConsumerId id) + { + if(id != null) + { + SessionId sessionId = id.ParentId; + if(sessionId != null) + { + ConnectionId connectionId = sessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + SessionState ss = cs[sessionId]; + if(ss != null) + { + ss.removeConsumer(id); + } + } + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processAddSession(SessionInfo info) + { + if(info != null) + { + ConnectionId connectionId = info.SessionId.ParentId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + cs.addSession(info); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveSession(SessionId id) + { + if(id != null) + { + ConnectionId connectionId = id.ParentId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + cs.removeSession(id); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processAddConnection(ConnectionInfo info) + { + if(info != null) + { + connectionStates.Add(info.ConnectionId, new ConnectionState(info)); + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processRemoveConnection(ConnectionId id) + { + if(id != null) + { + connectionStates.Remove(id); + } + return TRACKED_RESPONSE_MARKER; + } + + public override Response processMessage(Message send) + { + if(send != null) + { + if(TrackTransactions && send.TransactionId != null) + { + ConnectionId connectionId = send.ProducerId.ParentId.ParentId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + TransactionState transactionState = cs[send.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(send); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + else if(TrackMessages) + { + messageCache.Add(send.MessageId, (Message) send.Clone()); + RemoveEldestInCache(); + } + } + return null; + } + + public override Response processMessageAck(MessageAck ack) + { + if(TrackTransactions && ack != null && ack.TransactionId != null) + { + ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + TransactionState transactionState = cs[ack.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(ack); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public override Response processBeginTransaction(TransactionInfo info) + { + if(TrackTransactions && info != null && info.TransactionId != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + cs.addTransactionState(info.TransactionId); + TransactionState state = cs[info.TransactionId]; + state.addCommand(info); + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public override Response processPrepareTransaction(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public override Response processCommitTransactionOnePhase(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info, this)); + } + } + } + } + return null; + } + + public override Response processCommitTransactionTwoPhase(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info, this)); + } + } + } + } + return null; + } + + public override Response processRollbackTransaction(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info, this)); + } + } + } + } + return null; + } + + public override Response processEndTransaction(TransactionInfo info) + { + if(TrackTransactions && info != null) + { + ConnectionId connectionId = info.ConnectionId; + if(connectionId != null) + { + ConnectionState cs = connectionStates[connectionId]; + if(cs != null) + { + TransactionState transactionState = cs[info.TransactionId]; + if(transactionState != null) + { + transactionState.addCommand(info); + } + } + } + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public bool RestoreConsumers + { + get + { + return _restoreConsumers; + } + set + { + _restoreConsumers = value; + } + } + + public bool RestoreProducers + { + get + { + return _restoreProducers; + } + set + { + _restoreProducers = value; + } + } + + public bool RestoreSessions + { + get + { + return _restoreSessions; + } + set + { + _restoreSessions = value; + } + } + + public bool TrackTransactions + { + get + { + return _trackTransactions; + } + set + { + _trackTransactions = value; + } + } + + public bool RestoreTransaction + { + get + { + return _restoreTransaction; + } + set + { + _restoreTransaction = value; + } + } + + public bool TrackMessages + { + get + { + return _trackMessages; + } + set + { + _trackMessages = value; + } + } + + public int MaxCacheSize + { + get + { + return _maxCacheSize; + } + set + { + _maxCacheSize = value; + } + } + + public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId) + { + ConnectionState connectionState = connectionStates[connectionId]; + if(connectionState != null) + { + connectionState.ConnectionInterruptProcessingComplete = true; - /// - /// - /// - /// - protected void DoRestoreProducers(ITransport transport, SessionState sessionState) - { - // Restore the session's producers - foreach(ProducerState producerState in sessionState.ProducerStates) - { - transport.Oneway(producerState.Info); - } - } - - /// - /// - /// - /// - protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState) - { - // Restore the connection's temp destinations. - foreach(DestinationInfo destinationInfo in connectionState.TempDestinations) - { - transport.Oneway(destinationInfo); - } - } - - public override Response processAddDestination(DestinationInfo info) - { - if(info != null) - { - ConnectionState cs = connectionStates[info.ConnectionId]; - if(cs != null && info.Destination.IsTemporary) - { - cs.addTempDestination(info); - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveDestination(DestinationInfo info) - { - if(info != null) - { - ConnectionState cs = connectionStates[info.ConnectionId]; - if(cs != null && info.Destination.IsTemporary) - { - cs.removeTempDestination(info.Destination); - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processAddProducer(ProducerInfo info) - { - if(info != null && info.ProducerId != null) - { - SessionId sessionId = info.ProducerId.ParentId; - if(sessionId != null) - { - ConnectionId connectionId = sessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - SessionState ss = cs[sessionId]; - if(ss != null) - { - ss.addProducer(info); - } - } - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveProducer(ProducerId id) - { - if(id != null) - { - SessionId sessionId = id.ParentId; - if(sessionId != null) - { - ConnectionId connectionId = sessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - SessionState ss = cs[sessionId]; - if(ss != null) - { - ss.removeProducer(id); - } - } - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processAddConsumer(ConsumerInfo info) - { - if(info != null) - { - SessionId sessionId = info.ConsumerId.ParentId; - if(sessionId != null) - { - ConnectionId connectionId = sessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - SessionState ss = cs[sessionId]; - if(ss != null) - { - ss.addConsumer(info); - } - } - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveConsumer(ConsumerId id) - { - if(id != null) - { - SessionId sessionId = id.ParentId; - if(sessionId != null) - { - ConnectionId connectionId = sessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - SessionState ss = cs[sessionId]; - if(ss != null) - { - ss.removeConsumer(id); - } - } - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processAddSession(SessionInfo info) - { - if(info != null) - { - ConnectionId connectionId = info.SessionId.ParentId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - cs.addSession(info); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveSession(SessionId id) - { - if(id != null) - { - ConnectionId connectionId = id.ParentId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - cs.removeSession(id); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processAddConnection(ConnectionInfo info) - { - if(info != null) - { - connectionStates.Add(info.ConnectionId, new ConnectionState(info)); - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processRemoveConnection(ConnectionId id) - { - if(id != null) - { - connectionStates.Remove(id); - } - return TRACKED_RESPONSE_MARKER; - } - - public override Response processMessage(Message send) - { - if(send != null) - { - if(TrackTransactions && send.TransactionId != null) - { - ConnectionId connectionId = send.ProducerId.ParentId.ParentId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - TransactionState transactionState = cs[send.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(send); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - else if(TrackMessages) - { - messageCache.Add(send.MessageId, (Message) send.Clone()); - RemoveEldestInCache(); - } - } - return null; - } - - public override Response processMessageAck(MessageAck ack) - { - if(TrackTransactions && ack != null && ack.TransactionId != null) - { - ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - TransactionState transactionState = cs[ack.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(ack); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - return null; - } - - public override Response processBeginTransaction(TransactionInfo info) - { - if(TrackTransactions && info != null && info.TransactionId != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - cs.addTransactionState(info.TransactionId); - TransactionState state = cs[info.TransactionId]; - state.addCommand(info); - } - } - return TRACKED_RESPONSE_MARKER; - } - return null; - } - - public override Response processPrepareTransaction(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - return null; - } - - public override Response processCommitTransactionOnePhase(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - return new Tracked(new RemoveTransactionAction(info, this)); - } - } - } - } - return null; - } - - public override Response processCommitTransactionTwoPhase(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - return new Tracked(new RemoveTransactionAction(info, this)); - } - } - } - } - return null; - } - - public override Response processRollbackTransaction(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - return new Tracked(new RemoveTransactionAction(info, this)); - } - } - } - } - return null; - } - - public override Response processEndTransaction(TransactionInfo info) - { - if(TrackTransactions && info != null) - { - ConnectionId connectionId = info.ConnectionId; - if(connectionId != null) - { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) - { - TransactionState transactionState = cs[info.TransactionId]; - if(transactionState != null) - { - transactionState.addCommand(info); - } - } - } - return TRACKED_RESPONSE_MARKER; - } - return null; - } - - public bool RestoreConsumers - { - get - { - return _restoreConsumers; - } - set - { - _restoreConsumers = value; - } - } - - public bool RestoreProducers - { - get - { - return _restoreProducers; - } - set - { - _restoreProducers = value; - } - } - - public bool RestoreSessions - { - get - { - return _restoreSessions; - } - set - { - _restoreSessions = value; - } - } - - public bool TrackTransactions - { - get - { - return _trackTransactions; - } - set - { - _trackTransactions = value; - } - } - - public bool RestoreTransaction - { - get - { - return _restoreTransaction; - } - set - { - _restoreTransaction = value; - } - } - - public bool TrackMessages - { - get - { - return _trackMessages; - } - set - { - _trackMessages = value; - } - } - - public int MaxCacheSize - { - get - { - return _maxCacheSize; - } - set - { - _maxCacheSize = value; - } - } - - public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId) - { - ConnectionState connectionState = connectionStates[connectionId]; - if(connectionState != null) - { - connectionState.ConnectionInterruptProcessingComplete = true; - - Dictionary stalledConsumers = connectionState.RecoveringPullConsumers; - foreach(KeyValuePair entry in stalledConsumers) - { - ConsumerControl control = new ConsumerControl(); - control.ConsumerId = entry.Key; - control.Prefetch = entry.Value.PrefetchSize; - control.Destination = entry.Value.Destination; - try - { - if(Tracer.IsDebugEnabled) - { - Tracer.Debug("restored recovering consumer: " + control.ConsumerId + - " with: " + control.Prefetch); - } - transport.Oneway(control); - } - catch(Exception ex) - { - if(Tracer.IsDebugEnabled) - { - Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId + - " with: " + control.Prefetch + "Error: " + ex.Message); - } - } - } - stalledConsumers.Clear(); - } - } - - public void TransportInterrupted() - { - foreach(ConnectionState connectionState in connectionStates.Values) - { - connectionState.ConnectionInterruptProcessingComplete = false; - } - } - } + Dictionary stalledConsumers = connectionState.RecoveringPullConsumers; + foreach(KeyValuePair entry in stalledConsumers) + { + ConsumerControl control = new ConsumerControl(); + control.ConsumerId = entry.Key; + control.Prefetch = entry.Value.PrefetchSize; + control.Destination = entry.Value.Destination; + try + { + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("restored recovering consumer: " + control.ConsumerId + + " with: " + control.Prefetch); + } + transport.Oneway(control); + } + catch(Exception ex) + { + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId + + " with: " + control.Prefetch + "Error: " + ex.Message); + } + } + } + stalledConsumers.Clear(); + } + } + + public void TransportInterrupted() + { + foreach(ConnectionState connectionState in connectionStates.Values) + { + connectionState.ConnectionInterruptProcessingComplete = false; + } + } + } }