activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r380183 [1/3] - in /incubator/activemq/trunk/openwire-dotnet: src/OpenWire.Client/ src/OpenWire.Client/Commands/ src/OpenWire.Client/Core/ src/OpenWire.Client/IO/ tests/OpenWire.Client/
Date Thu, 23 Feb 2006 18:16:23 GMT
Author: jstrachan
Date: Thu Feb 23 10:16:15 2006
New Revision: 380183

URL: http://svn.apache.org/viewcvs?rev=380183&view=rev
Log:
updated working .Net code which is capable of creating connections, sessions, producers and consumers - not quite completed the consumer side yet but we can send messages now! :)

Added:
    incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/EndianTest.cs
    incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestMain.cs
Modified:
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/BaseCommand.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/AbstractCommand.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/BooleanStream.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/WireFormatInfoMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/XATransactionIdMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs
    incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs Thu Feb 23 10:16:15 2006
@@ -22,13 +22,13 @@
 
                 
                 public override bool IsMarshallAware() {
-                        return true; 
-                }     
+                        return true;
+                }
                 
                 // Properties
                 public IDestination FromDestination {
-                        get { return Destination; } 
-                        set { this.Destination = ActiveMQDestination.Transform(value); } 
+                        get { return Destination; }
+                        set { this.Destination = ActiveMQDestination.Transform(value); }
                 }
 
                 public void BeforeMarshall(OpenWireFormat wireFormat) {

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs Thu Feb 23 10:16:15 2006
@@ -3,28 +3,40 @@
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
 
-namespace OpenWire.Client.Commands {
-        /// <summary>
-        /// Summary description for ActiveMQQueue.
-        /// </summary>
-        public class ActiveMQQueue : ActiveMQDestination, IQueue {
-                public const byte ID_ActiveMQQueue = 100;
-
-                public ActiveMQQueue() : base() {
-                }
-                public ActiveMQQueue(String name) : base(name) {
-                }
-
-                public String QueueName {
-                        get { return PhysicalName; } 
-                }
-
-                public override int GetDestinationType() {
-                        return ACTIVEMQ_QUEUE; 
-                }
-
-                public override ActiveMQDestination CreateDestination(String name) {
-                        return new ActiveMQQueue(name); 
-                } 
-        } 
+namespace OpenWire.Client.Commands
+{
+    /// <summary>
+    /// Summary description for ActiveMQQueue.
+    /// </summary>
+    public class ActiveMQQueue : ActiveMQDestination, IQueue
+    {
+        public const byte ID_ActiveMQQueue = 100;
+        
+        public ActiveMQQueue() : base()
+        {
+        }
+        public ActiveMQQueue(String name) : base(name)
+        {
+        }
+        
+        public String QueueName
+        {
+            get { return PhysicalName; }
+        }
+        
+        public override byte GetDataStructureType()
+        {
+            return ID_ActiveMQQueue;
+        }
+        
+        public override int GetDestinationType()
+        {
+            return ACTIVEMQ_QUEUE;
+        }
+        
+        public override ActiveMQDestination CreateDestination(String name)
+        {
+            return new ActiveMQQueue(name);
+        }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs Thu Feb 23 10:16:15 2006
@@ -3,28 +3,40 @@
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
 
-namespace OpenWire.Client.Commands {
-        /// <summary>
-        /// Summary description for ActiveMQTempQueue.
-        /// </summary>
-        public class ActiveMQTempQueue : ActiveMQDestination, ITemporaryQueue {
-                public const byte ID_ActiveMQTempQueue = 102;
-
-                public ActiveMQTempQueue() : base() {
-                }
-                public ActiveMQTempQueue(String name) : base(name) {
-                }
-
-                public String GetQueueName() {
-                        return PhysicalName; 
-                }
-
-                public override int GetDestinationType() {
-                        return ACTIVEMQ_QUEUE; 
-                }
-
-                public override ActiveMQDestination CreateDestination(String name) {
-                        return new ActiveMQTempQueue(name); 
-                } 
-        } 
+namespace OpenWire.Client.Commands
+{
+    /// <summary>
+    /// Summary description for ActiveMQTempQueue.
+    /// </summary>
+    public class ActiveMQTempQueue : ActiveMQDestination, ITemporaryQueue
+    {
+        public const byte ID_ActiveMQTempQueue = 102;
+        
+        public ActiveMQTempQueue() : base()
+        {
+        }
+        public ActiveMQTempQueue(String name) : base(name)
+        {
+        }
+        
+        public String GetQueueName()
+        {
+            return PhysicalName;
+        }
+        
+        public override byte GetDataStructureType()
+        {
+            return ID_ActiveMQTempQueue;
+        }
+        
+        public override int GetDestinationType()
+        {
+            return ACTIVEMQ_QUEUE;
+        }
+        
+        public override ActiveMQDestination CreateDestination(String name)
+        {
+            return new ActiveMQTempQueue(name);
+        }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs Thu Feb 23 10:16:15 2006
@@ -3,27 +3,40 @@
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
 
-namespace OpenWire.Client.Commands {
-        /// <summary>
-        /// Summary description for ActiveMQTempTopic.
-        /// </summary>
-        public class ActiveMQTempTopic : ActiveMQDestination, ITemporaryTopic {
-                public const byte ID_ActiveMQTempTopic = 103;
-
-                public ActiveMQTempTopic() : base() {
-                }
-                public ActiveMQTempTopic(String name) : base(name) {
-                }
-
-                public String GetTopicName() {
-                        return PhysicalName; 
-                }
-                public override int GetDestinationType() {
-                        return ACTIVEMQ_TOPIC; 
-                }
-
-                public override ActiveMQDestination CreateDestination(String name) {
-                        return new ActiveMQTempTopic(name); 
-                } 
-        } 
+namespace OpenWire.Client.Commands
+{
+    /// <summary>
+    /// Summary description for ActiveMQTempTopic.
+    /// </summary>
+    public class ActiveMQTempTopic : ActiveMQDestination, ITemporaryTopic
+    {
+        public const byte ID_ActiveMQTempTopic = 103;
+        
+        public ActiveMQTempTopic() : base()
+        {
+        }
+        public ActiveMQTempTopic(String name) : base(name)
+        {
+        }
+        
+        public String GetTopicName()
+        {
+            return PhysicalName;
+        }
+        
+        public override byte GetDataStructureType()
+        {
+            return ID_ActiveMQTempTopic;
+        }
+        
+        public override int GetDestinationType()
+        {
+            return ACTIVEMQ_TOPIC;
+        }
+        
+        public override ActiveMQDestination CreateDestination(String name)
+        {
+            return new ActiveMQTempTopic(name);
+        }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs Thu Feb 23 10:16:15 2006
@@ -3,28 +3,40 @@
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
 
-namespace OpenWire.Client.Commands {
-        /// <summary>
-        /// Summary description for ActiveMQTopic.
-        /// </summary>
-        public class ActiveMQTopic : ActiveMQDestination, ITopic {
-                public const byte ID_ActiveMQTopic = 101;
-
-                public ActiveMQTopic() : base() {
-                }
-                public ActiveMQTopic(String name) : base(name) {
-                }
-
-                public String TopicName {
-                        get { return PhysicalName; } 
-                }
-
-                public override int GetDestinationType() {
-                        return ACTIVEMQ_TOPIC; 
-                }
-
-                public override ActiveMQDestination CreateDestination(String name) {
-                        return new ActiveMQTopic(name); 
-                } 
-        } 
+namespace OpenWire.Client.Commands
+{
+    /// <summary>
+    /// Summary description for ActiveMQTopic.
+    /// </summary>
+    public class ActiveMQTopic : ActiveMQDestination, ITopic
+    {
+        public const byte ID_ActiveMQTopic = 101;
+        
+        public ActiveMQTopic() : base()
+        {
+        }
+        public ActiveMQTopic(String name) : base(name)
+        {
+        }
+        
+        public String TopicName
+        {
+            get { return PhysicalName; }
+        }
+        
+        public override byte GetDataStructureType()
+        {
+            return ID_ActiveMQTopic;
+        }
+        
+        public override int GetDestinationType()
+        {
+            return ACTIVEMQ_TOPIC;
+        }
+        
+        public override ActiveMQDestination CreateDestination(String name)
+        {
+            return new ActiveMQTopic(name);
+        }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/BaseCommand.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/BaseCommand.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/BaseCommand.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/BaseCommand.cs Thu Feb 23 10:16:15 2006
@@ -13,51 +13,38 @@
 using OpenWire.Client;
 using OpenWire.Client.Core;
 
-namespace OpenWire.Client.Commands {
-
-        public class BaseCommand : AbstractCommand {
-                public const byte ID_BaseCommand = 0;
-
-                short commandId;
-                bool responseRequired;
-
-
-                public override int GetHashCode() {
-                        return commandId;
-                }
-
-                public override bool Equals(Object that) {
-                        if (that is BaseCommand) {
-                                BaseCommand thatCommand = (BaseCommand) that;
-                                return this.GetDataStructureType() == thatCommand.GetDataStructureType()
-                                        && this.CommandId == thatCommand.CommandId; 
-                        }
-                        return false; 
-                }
-
-                public override String ToString() {
-                        string answer = GetDataStructureTypeAsString(GetDataStructureType());
-                        if (answer.Length == 0) {
-                                answer = base.ToString(); 
-                        }
-                        return answer + ": id = " + CommandId; 
-                }
-
-                public override byte GetDataStructureType() {
-                        return ID_BaseCommand; 
-                }
-
-
-                // Properties
-
-                public short CommandId {
-                        get { return commandId; }
-                        set { this.commandId = value; } 
-                }
-
-                public bool ResponseRequired {
-                        get { return responseRequired; }
-                        set { this.responseRequired = value; } 
-                } 
-        } 
+namespace OpenWire.Client.Commands
+{
+    
+    public abstract class BaseCommand : AbstractCommand
+    {
+        
+        public override int GetHashCode()
+        {
+            return (CommandId * 37) + GetDataStructureType();
+        }
+        
+        public override bool Equals(Object that)
+        {
+            if (that is BaseCommand)
+            {
+                BaseCommand thatCommand = (BaseCommand) that;
+                return this.GetDataStructureType() == thatCommand.GetDataStructureType()
+                    && this.CommandId == thatCommand.CommandId;
+            }
+            return false;
+        }
+        
+        public override String ToString()
+        {
+            string answer = GetDataStructureTypeAsString(GetDataStructureType());
+            if (answer.Length == 0)
+            {
+                answer = base.ToString();
+            }
+            return answer + ": id = " + CommandId;
+        }
+        
+        
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs Thu Feb 23 10:16:15 2006
@@ -3,117 +3,214 @@
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
 
-namespace OpenWire.Client {
+namespace OpenWire.Client
+{
+    /// <summary>
+    /// Represents a connection with a message broker
+    /// </summary>
+    public class Connection : IConnection
+    {
+        static private char[] MAGIC = new char[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
+        
+        private ITransport transport;
+        private ConnectionInfo info;
+        private WireFormatInfo wireFormatInfo = new WireFormatInfo();
+        IList sessions = new ArrayList();
+        private bool transacted;
+        private bool connected;
+        private bool closed;
+        private AcknowledgementMode acknowledgementMode;
+        private long sessionCounter;
+        private IDictionary consumers = new Hashtable(); // TODO threadsafe
+        
+        
+        public Connection(ITransport transport, ConnectionInfo info)
+        {
+            this.transport = transport;
+            this.info = info;
+            this.transport.Command += new CommandHandler(OnCommand);
+        }
+        
+        
         /// <summary>
-        /// Represents a connection with a message broker
+        /// Creates a new session to work on this connection
         /// </summary>
-        public class Connection : IConnection {
-
-                private ConnectionInfo info;
-                private ITransport transport;
-                IList sessions = new ArrayList();
-                private bool transacted;
-                private bool connected;
-                private bool closed;
-                private AcknowledgementMode acknowledgementMode;
-                private long sessionCounter;
-
-                public Connection(ITransport transport, ConnectionInfo info) {
-                        this.transport = transport;
-                        this.info = info; 
-                }
-
-                /// <summary>
-                /// Creates a new session to work on this connection
-                /// </summary>
-                public ISession CreateSession() {
-                        return CreateSession(transacted, acknowledgementMode); 
-                }
-
-                /// <summary>
-                /// Creates a new session to work on this connection
-                /// </summary>
-                public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode) {
-                        CheckConnected();
-                        SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode);
-                        SyncRequest(info);
-                        Session session = new Session(this, info);
-                        sessions.Add(session);
-                        return session; 
-                }
-
-                public void Dispose() {
-                        foreach (Session session in sessions) {
-                                session.Dispose(); 
-                        }
-                        sessions.Clear();
-                        transport.Dispose();
-                        closed = true; 
-                }
-
-                // Properties
-
-                public ITransport ITransport {
-                        get { return transport; }
-                        set { this.transport = value; } 
-                }
-
-                public bool Transacted {
-                        get { return transacted; }
-                        set { this.transacted = value; } 
-                }
-
-                public AcknowledgementMode AcknowledgementMode {
-                        get { return acknowledgementMode; }
-                        set { this.acknowledgementMode = value; } 
-                }
-
-                public string ClientId {
-                        get { return info.ClientId; }
-                        set {
-                                if (connected) {
-                                        throw new OpenWireException("You cannot change the ClientId once the Connection is connected"); 
-                                }
-                                info.ClientId = value;
-                        }
-                }
-
-                // Implementation methods
-
-                /// <summary>
-                /// Performs a synchronous request-response with the broker
-                /// </summary>
-                public Response SyncRequest(Command command) {
-                        CheckConnected();
-                        Response response = ITransport.Request(command);
-                        if (response is ExceptionResponse) {
-                                ExceptionResponse exceptionResponse = (ExceptionResponse) response;
-                                // TODO include stack trace
-                                throw new OpenWireException("Request failed: " + exceptionResponse); 
-                        }
-                        return response; 
-                }
-
-
-                protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode) {
-                        SessionInfo answer = new SessionInfo();
-                        SessionId sessionId = new SessionId();
-                        sessionId.ConnectionId = info.ConnectionId.Value;
-                        lock (this) {
-                                sessionId.Value = ++sessionCounter; 
-                        }
-                        answer.SessionId = sessionId;
-                        return answer; 
-                }
-
-                protected void CheckConnected() {
-                        if (closed) {
-                                throw new ConnectionClosedException(); 
-                        }
-                        if (!connected) {
-                                SyncRequest(info);
-                                connected = true; 
-                        } 
-                } 
-        } 
+        public ISession CreateSession()
+        {
+            return CreateSession(transacted, acknowledgementMode);
+        }
+        
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode)
+        {
+            CheckConnected();
+            SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode);
+            SyncRequest(info);
+            Session session = new Session(this, info);
+            sessions.Add(session);
+            return session;
+        }
+        
+        public void Dispose()
+        {
+            foreach (Session session in sessions)
+            {
+                session.Dispose();
+            }
+            sessions.Clear();
+            transport.Dispose();
+            closed = true;
+        }
+        
+        // Properties
+        
+        public ITransport ITransport
+        {
+            get { return transport; }
+            set { this.transport = value; }
+        }
+        
+        public bool Transacted
+        {
+            get { return transacted; }
+            set { this.transacted = value; }
+        }
+        
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return acknowledgementMode; }
+            set { this.acknowledgementMode = value; }
+        }
+        
+        public string ClientId
+        {
+            get { return info.ClientId; }
+            set {
+                if (connected)
+                {
+                    throw new OpenWireException("You cannot change the ClientId once the Connection is connected");
+                }
+                info.ClientId = value;
+            }
+        }
+        
+        // Implementation methods
+        
+        /// <summary>
+        /// Performs a synchronous request-response with the broker
+        /// </summary>
+        public Response SyncRequest(Command command)
+        {
+            Response response = transport.Request(command);
+            if (response is ExceptionResponse)
+            {
+                ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+                // TODO include stack trace
+                throw new OpenWireException("Request failed: " + exceptionResponse);
+            }
+            return response;
+        }
+        
+        
+        protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode)
+        {
+            SessionInfo answer = new SessionInfo();
+            SessionId sessionId = new SessionId();
+            sessionId.ConnectionId = info.ConnectionId.Value;
+            lock (this)
+            {
+                sessionId.Value = ++sessionCounter;
+            }
+            answer.SessionId = sessionId;
+            return answer;
+        }
+        
+        protected void CheckConnected()
+        {
+            if (closed)
+            {
+                throw new ConnectionClosedException();
+            }
+            if (!connected)
+            {
+                Console.WriteLine("ConnectionId: " + info.ConnectionId.Value);
+                Console.WriteLine("ClientID: " + info.ClientId);
+                
+                Console.WriteLine("About to send WireFormatInfo: " + wireFormatInfo);
+                // lets configure the wire format
+                wireFormatInfo.Magic = CreateMagicBytes();
+                wireFormatInfo.Version = 1;
+                transport.Oneway(wireFormatInfo);
+                
+                Console.WriteLine("About to send ConnectionInfo: " + info);
+                SyncRequest(info);
+                Console.WriteLine("Received connection info response");
+                connected = true;
+            }
+        }
+        
+        /// <summary>
+        /// Register a new consumer
+        /// </summary>
+        /// <param name="consumerId">A  ConsumerId</param>
+        /// <param name="consumer">A  MessageConsumer</param>
+        public void AddConsumer(ConsumerId consumerId, MessageConsumer consumer)
+        {
+            Console.WriteLine("#### Adding consumerId: " + consumerId.Value + " session: " + consumerId.SessionId + " with consumer: " + consumer);
+            consumers[consumerId] = consumer;
+        }
+        
+        
+        /// <summary>
+        /// Remove a consumer
+        /// </summary>
+        /// <param name="consumerId">A  ConsumerId</param>
+        public void RemoveConsumer(ConsumerId consumerId)
+        {
+            consumers[consumerId] = null;
+        }
+        
+        
+        /// <summary>
+        /// Handle incoming commands
+        /// </summary>
+        /// <param name="transport">An ITransport</param>
+        /// <param name="command">A  Command</param>
+        protected void OnCommand(ITransport transport, Command command)
+        {
+            if (command is MessageDispatch) {
+                MessageDispatch dispatch = (MessageDispatch) command;
+                ConsumerId consumerId = dispatch.ConsumerId;
+                MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
+                if (consumer == null) {
+                    Console.WriteLine("No such consumer active: " + consumerId);
+                    Console.WriteLine("No such consumer active: " + consumerId.Value);
+                    Console.WriteLine("No such consumer active: " + consumerId.SessionId);
+                }
+                else {
+                    ActiveMQMessage message = (ActiveMQMessage) dispatch.Message;
+                    consumer.Dispatch(message);
+                }
+            }
+            else {
+                Console.WriteLine("Unknown command: " + command);
+            }
+        }
+        
+        /// <summary>
+        /// Method CreateMagicBytes
+        /// </summary>
+        /// <returns>A  byte[]</retutns>
+        private byte[] CreateMagicBytes()
+        {
+            byte[] answer = new byte[MAGIC.Length];
+            for (int i = 0; i < answer.Length; i++)
+            {
+                answer[i] = (byte) MAGIC[i];
+            }
+            return answer;
+        }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs Thu Feb 23 10:16:15 2006
@@ -3,82 +3,102 @@
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
 
-namespace OpenWire.Client {
-        /// <summary>
-        /// Represents a connection with a message broker
-        /// </summary>
-        public class ConnectionFactory : IConnectionFactory {
-                private string host = "localhost";
-                private int port = 61616;
-                private string userName;
-                private string password;
-                private string clientId;
-
-                public ConnectionFactory() {
-                }
-
-                public ConnectionFactory(string host, int port) {
-                        this.host = host;
-                        this.port = port; 
-                }
-
-                public IConnection CreateConnection() {
-                        return CreateConnection(userName, password); 
-                }
-
-                public IConnection CreateConnection(string userName, string password) {
-                        ConnectionInfo info = CreateConnectionInfo(userName, password);
-                        ITransport transport = CreateITransport();
-                        Connection connection = new Connection(transport, info);
-                        connection.ClientId = clientId;
-                        return connection; 
-                }
-
-                // Properties
-
-                public string Host {
-                        get { return host; }
-                        set { host = value; } 
-                }
-
-                public int Port {
-                        get { return port; }
-                        set { port = value; } 
-                }
-
-                public string UserName {
-                        get { return userName; }
-                        set { userName = value; } 
-                }
-
-                public string Password {
-                        get { return password; }
-                        set { password = value; } 
-                }
-
-                public string ClientId {
-                        get { return clientId; }
-                        set { clientId = value; } 
-                }
-
-                // Implementation methods
-
-                protected ConnectionInfo CreateConnectionInfo(string userName, string password) {
-                        ConnectionInfo answer = new ConnectionInfo();
-                        ConnectionId connectionId = new ConnectionId();
-                        connectionId.Value = CreateNewConnectionID();
-                        answer.ConnectionId = connectionId;
-                        answer.UserName = userName;
-                        answer.Password = password;
-                        return answer; 
-                }
-
-                protected string CreateNewConnectionID() {
-                        return Guid.NewGuid().ToString(); 
-                }
-
-                protected ITransport CreateITransport() {
-                        return new SocketTransport(host, port); 
-                } 
-        } 
+namespace OpenWire.Client
+{
+    /// <summary>
+    /// Represents a connection with a message broker
+    /// </summary>
+    public class ConnectionFactory : IConnectionFactory
+    {
+        private string host = "localhost";
+        private int port = 61616;
+        private string userName;
+        private string password;
+        private string clientId;
+        
+        public ConnectionFactory()
+        {
+        }
+        
+        public ConnectionFactory(string host, int port)
+        {
+            this.host = host;
+            this.port = port;
+        }
+        
+        public IConnection CreateConnection()
+        {
+            return CreateConnection(userName, password);
+        }
+        
+        public IConnection CreateConnection(string userName, string password)
+        {
+            ConnectionInfo info = CreateConnectionInfo(userName, password);
+            ITransport transport = CreateTransport();
+            Connection connection = new Connection(transport, info);
+            connection.ClientId = info.ClientId;
+            return connection;
+        }
+        
+        // Properties
+        
+        public string Host
+        {
+            get { return host; }
+            set { host = value; }
+        }
+        
+        public int Port
+        {
+            get { return port; }
+            set { port = value; }
+        }
+        
+        public string UserName
+        {
+            get { return userName; }
+            set { userName = value; }
+        }
+        
+        public string Password
+        {
+            get { return password; }
+            set { password = value; }
+        }
+        
+        public string ClientId
+        {
+            get { return clientId; }
+            set { clientId = value; }
+        }
+        
+        // Implementation methods
+        
+        protected ConnectionInfo CreateConnectionInfo(string userName, string password)
+        {
+            ConnectionInfo answer = new ConnectionInfo();
+            ConnectionId connectionId = new ConnectionId();
+            connectionId.Value = CreateNewGuid();
+            
+            answer.ConnectionId = connectionId;
+            answer.UserName = userName;
+            answer.Password = password;
+            answer.ClientId = clientId;
+            if (clientId == null)
+            {
+                answer.ClientId = CreateNewGuid();
+            }
+            return answer;
+        }
+        
+        protected string CreateNewGuid()
+        {
+            return Guid.NewGuid().ToString();
+        }
+        
+        protected ITransport CreateTransport()
+        {
+            return new SocketTransport(host, port);
+        }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/AbstractCommand.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/AbstractCommand.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/AbstractCommand.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/AbstractCommand.cs Thu Feb 23 10:16:15 2006
@@ -3,85 +3,111 @@
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
 
-namespace OpenWire.Client.Core {
-        /// <summary>
-        /// Summary description for AbstractCommand.
-        /// </summary>
-        public abstract class AbstractCommand : Command {
-
-                protected AbstractCommand() {
-                }
-
-                public virtual byte GetDataStructureType() {
-                        return 0; 
-                }             
-                
-                public virtual bool IsMarshallAware() {
-                        return false; 
-                }             
-                
-                public static String GetDataStructureTypeAsString(int type) {
-                        String packetTypeStr = "";
-                        switch (type) {
-                                case ActiveMQMessage.ID_ActiveMQMessage :
-                                        packetTypeStr = "ACTIVEMQ_MESSAGE";
-                                        break;
-                                case ActiveMQTextMessage.ID_ActiveMQTextMessage :
-                                        packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
-                                        break;
-                                case ActiveMQObjectMessage.ID_ActiveMQObjectMessage:
-                                        packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
-                                        break;
-                                case ActiveMQBytesMessage.ID_ActiveMQBytesMessage :
-                                        packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
-                                        break;
-                                case ActiveMQStreamMessage.ID_ActiveMQStreamMessage :
-                                        packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
-                                        break;
-                                case ActiveMQMapMessage.ID_ActiveMQMapMessage :
-                                        packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
-                                        break;
-                                case MessageAck.ID_MessageAck :
-                                        packetTypeStr = "ACTIVEMQ_MSG_ACK";
-                                        break;
-                                case Response.ID_Response :
-                                        packetTypeStr = "RESPONSE";
-                                        break;
-                                case ConsumerInfo.ID_ConsumerInfo :
-                                        packetTypeStr = "CONSUMER_INFO";
-                                        break;
-                                case ProducerInfo.ID_ProducerInfo :
-                                        packetTypeStr = "PRODUCER_INFO";
-                                        break;
-                                case TransactionInfo.ID_TransactionInfo :
-                                        packetTypeStr = "TRANSACTION_INFO";
-                                        break;
-                                case BrokerInfo.ID_BrokerInfo :
-                                        packetTypeStr = "BROKER_INFO";
-                                        break;
-                                case ConnectionInfo.ID_ConnectionInfo :
-                                        packetTypeStr = "CONNECTION_INFO";
-                                        break;
-                                case SessionInfo.ID_SessionInfo :
-                                        packetTypeStr = "SESSION_INFO";
-                                        break;
-                                case RemoveSubscriptionInfo.ID_RemoveSubscriptionInfo :
-                                        packetTypeStr = "DURABLE_UNSUBSCRIBE";
-                                        break;
-                                case IntegerResponse.ID_IntegerResponse :
-                                        packetTypeStr = "INT_RESPONSE_RECEIPT_INFO";
-                                        break;
-                                case WireFormatInfo.ID_WireFormatInfo :
-                                        packetTypeStr = "WIRE_FORMAT_INFO";
-                                        break;
-                                case RemoveInfo.ID_RemoveInfo :
-                                        packetTypeStr = "REMOVE_INFO";
-                                        break;
-                                case KeepAliveInfo.ID_KeepAliveInfo :
-                                        packetTypeStr = "KEEP_ALIVE";
-                                        break;
-                        }
-                        return packetTypeStr; 
-                } 
-        } 
+namespace OpenWire.Client.Core
+{
+    /// <summary>
+    /// Summary description for AbstractCommand.
+    /// </summary>
+    public abstract class AbstractCommand : Command
+    {
+        private short commandId;
+        private bool responseRequired;
+        
+        
+        protected AbstractCommand()
+        {
+        }
+        
+        public virtual byte GetDataStructureType()
+        {
+            return 0;
+        }
+        
+        public virtual bool IsMarshallAware()
+        {
+            return false;
+        }
+        
+        
+        
+        // Properties
+        
+        public short CommandId
+        {
+            get { return commandId; }
+            set { this.commandId = value; }
+        }
+        
+        public bool ResponseRequired
+        {
+            get { return responseRequired; }
+            set { this.responseRequired = value; }
+        }
+        
+        public static String GetDataStructureTypeAsString(int type)
+        {
+            String packetTypeStr = "";
+            switch (type)
+            {
+                case ActiveMQMessage.ID_ActiveMQMessage :
+                    packetTypeStr = "ACTIVEMQ_MESSAGE";
+                    break;
+                case ActiveMQTextMessage.ID_ActiveMQTextMessage :
+                    packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
+                    break;
+                case ActiveMQObjectMessage.ID_ActiveMQObjectMessage:
+                    packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
+                    break;
+                case ActiveMQBytesMessage.ID_ActiveMQBytesMessage :
+                    packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
+                    break;
+                case ActiveMQStreamMessage.ID_ActiveMQStreamMessage :
+                    packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
+                    break;
+                case ActiveMQMapMessage.ID_ActiveMQMapMessage :
+                    packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
+                    break;
+                case MessageAck.ID_MessageAck :
+                    packetTypeStr = "ACTIVEMQ_MSG_ACK";
+                    break;
+                case Response.ID_Response :
+                    packetTypeStr = "RESPONSE";
+                    break;
+                case ConsumerInfo.ID_ConsumerInfo :
+                    packetTypeStr = "CONSUMER_INFO";
+                    break;
+                case ProducerInfo.ID_ProducerInfo :
+                    packetTypeStr = "PRODUCER_INFO";
+                    break;
+                case TransactionInfo.ID_TransactionInfo :
+                    packetTypeStr = "TRANSACTION_INFO";
+                    break;
+                case BrokerInfo.ID_BrokerInfo :
+                    packetTypeStr = "BROKER_INFO";
+                    break;
+                case ConnectionInfo.ID_ConnectionInfo :
+                    packetTypeStr = "CONNECTION_INFO";
+                    break;
+                case SessionInfo.ID_SessionInfo :
+                    packetTypeStr = "SESSION_INFO";
+                    break;
+                case RemoveSubscriptionInfo.ID_RemoveSubscriptionInfo :
+                    packetTypeStr = "DURABLE_UNSUBSCRIBE";
+                    break;
+                case IntegerResponse.ID_IntegerResponse :
+                    packetTypeStr = "INT_RESPONSE_RECEIPT_INFO";
+                    break;
+                case WireFormatInfo.ID_WireFormatInfo :
+                    packetTypeStr = "WIRE_FORMAT_INFO";
+                    break;
+                case RemoveInfo.ID_RemoveInfo :
+                    packetTypeStr = "REMOVE_INFO";
+                    break;
+                case KeepAliveInfo.ID_KeepAliveInfo :
+                    packetTypeStr = "KEEP_ALIVE";
+                    break;
+            }
+            return packetTypeStr;
+        }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/BooleanStream.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/BooleanStream.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/BooleanStream.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/BooleanStream.cs Thu Feb 23 10:16:15 2006
@@ -5,89 +5,115 @@
 using OpenWire.Client.Core;
 using OpenWire.Client.IO;
 
-namespace OpenWire.Client.Core {
-        /// <summary>
-        /// Represents a stream of boolean flags
-        /// </summary>
-        public class BooleanStream {
-                byte[] data = new byte[48];
-                short arrayLimit;
-                short arrayPos;
-                byte bytePos;
-
-                public bool ReadBoolean() {
-                        byte b = data[arrayPos];
-                        bool rc = ((b >> bytePos) & 0x01) != 0;
-                        bytePos++;
-                        if (bytePos >= 8) {
-                                bytePos = 0;
-                                arrayPos++;
-                        }
-                        return rc;
-                }
-
-                public void WriteBoolean(bool value) {
-                        if (bytePos == 0) {
-                                arrayLimit++;
-                                if (arrayLimit >= data.Length) {
-                                        // re-grow the array.
-                                        byte[] d = new byte[data.Length * 2];
-                                        for (int i = 0; i < data.Length; i++) {
-                                                d[i] = data[i];
-                                        }
-                                        data = d;
-                                }
-                        }
-                        if (value) {
-                                data[arrayPos] |= (byte) (0x01 << bytePos);
-                        }
-                        bytePos++;
-                        if (bytePos >= 8) {
-                                bytePos = 0;
-                                arrayPos++;
-                        }
-                }
-
-                public void Marshal(BinaryWriter dataOut) {
-                        if (arrayLimit < 64) {
-                                dataOut.Write((byte) arrayLimit);
-                        } else if (arrayLimit < 256) { // max value of unsigned byte
-                                dataOut.Write((byte) 0xC0);
-                                dataOut.Write((byte) arrayLimit);
-                        } else {
-                                dataOut.Write((byte) 0xE0);
-                                dataOut.Write((short) arrayLimit);
-                        }
-
-                        dataOut.Write(data, 0, arrayLimit);
-                        Clear();
-                }
-
-                public void Unmarshal(BinaryReader dataIn) {
-                        arrayLimit = dataIn.ReadByte();
-                        if ((arrayLimit & 0xE0) != 0) {
-                                arrayLimit = dataIn.ReadInt16();
-                        } else if ((arrayLimit & 0xC0) != 0) {
-                                arrayLimit = (short) (dataIn.ReadByte() & 0xFF);
-                        }
-                        if (data.Length < arrayLimit) {
-                                data = new byte[arrayLimit];
-                        }
-                        dataIn.Read(data, 0, arrayLimit);
-                        Clear();
-                }
-
-                public void Clear() {
-                        arrayPos = 0;
-                        bytePos = 0;
-                }
-
-                public int MarshalledSize() {
-                        if (arrayLimit < 64) {
-                                return 1 + arrayLimit;
-                        } else {
-                                return 2 + arrayLimit;
-                        }
-                }
+namespace OpenWire.Client.Core
+{
+    /// <summary>
+    /// Represents a stream of boolean flags
+    /// </summary>
+    public class BooleanStream
+    {
+        byte[] data = new byte[48];
+        short arrayLimit;
+        short arrayPos;
+        byte bytePos;
+        
+        public bool ReadBoolean()
+        {
+            byte b = data[arrayPos];
+            bool rc = ((b >> bytePos) & 0x01) != 0;
+            bytePos++;
+            if (bytePos >= 8)
+            {
+                bytePos = 0;
+                arrayPos++;
+            }
+            return rc;
+        }
+        
+        public void WriteBoolean(bool value)
+        {
+            if (bytePos == 0)
+            {
+                arrayLimit++;
+                if (arrayLimit >= data.Length)
+                {
+                    // re-grow the array.
+                    byte[] d = new byte[data.Length * 2];
+                    for (int i = 0; i < data.Length; i++)
+                    {
+                        d[i] = data[i];
+                    }
+                    data = d;
+                }
+            }
+            if (value)
+            {
+                data[arrayPos] |= (byte) (0x01 << bytePos);
+            }
+            bytePos++;
+            if (bytePos >= 8)
+            {
+                bytePos = 0;
+                arrayPos++;
+            }
+        }
+        
+        public void Marshal(BinaryWriter dataOut)
+        {
+            if (arrayLimit < 64)
+            {
+                dataOut.Write((byte) arrayLimit);
+            }
+            else if (arrayLimit < 256)
+            { // max value of unsigned byte
+                dataOut.Write((byte) 0xC0);
+                dataOut.Write((byte) arrayLimit);
+            }
+            else
+            {
+                dataOut.Write((byte) 0xE0);
+                DataStreamMarshaller.WriteShort(arrayLimit, dataOut);
+            }
+            
+            dataOut.Write(data, 0, arrayLimit);
+            Clear();
+        }
+        
+        public void Unmarshal(BinaryReader dataIn)
+        {
+            arrayLimit = DataStreamMarshaller.ReadByte(dataIn);
+            if ((arrayLimit & 0xE0) != 0)
+            {
+                arrayLimit = DataStreamMarshaller.ReadShort(dataIn);
+            }
+            else if ((arrayLimit & 0xC0) != 0)
+            {
+                arrayLimit = (short) (DataStreamMarshaller.ReadByte(dataIn) & 0xFF);
+            }
+            if (data.Length < arrayLimit)
+            {
+                data = new byte[arrayLimit];
+            }
+            dataIn.Read(data, 0, arrayLimit);
+            Clear();
+        }
+        
+        public void Clear()
+        {
+            arrayPos = 0;
+            bytePos = 0;
+        }
+        
+        public int MarshalledSize()
+        {
+            if (arrayLimit < 64)
+            {
+                return 1 + arrayLimit;
+            }
+            else
+            {
+                return 2 + arrayLimit;
+            }
         }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Command.cs Thu Feb 23 10:16:15 2006
@@ -1,22 +1,23 @@
 using System;
 using OpenWire.Client.Core;
 
-namespace OpenWire.Client.Core {
-        /// <summary>
-        /// An OpenWire command
-        /// </summary>
-        public interface Command : DataStructure {
-                
-                /* TODO
-                short CommandId {
-                        get;
-                        set; 
-                }
-
-                bool ResponseRequired {
-                        get;
-                        set;
-                } 
-                */
-        } 
+namespace OpenWire.Client.Core
+{
+    /// <summary>
+    /// An OpenWire command
+    /// </summary>
+    public interface Command : DataStructure
+    {
+        short CommandId
+        {
+            get;
+            set;
+        }
+        
+        bool ResponseRequired
+        {
+            get;
+            set;
+        }
+    }
 }



Mime
View raw message