activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r380183 [2/3] - in /incubator/activemq/trunk/openwire-dotnet: src/OpenWire.Client/ src/OpenWire.Client/Commands/ src/OpenWire.Client/Core/ src/OpenWire.Client/IO/ tests/OpenWire.Client/
Date Thu, 23 Feb 2006 18:16:23 GMT
Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -1,401 +1,516 @@
 using System;
 using System.IO;
+using System.Net;
 
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
 using OpenWire.Client.IO;
 
-namespace OpenWire.Client.Core {
-        /// <summary>
-        /// A base class with useful implementation inheritence methods
-        /// for creating marshallers of the OpenWire protocol
-        /// </summary>
-        public abstract class DataStreamMarshaller {
-
-                public abstract DataStructure CreateObject();
-                public abstract byte GetDataStructureType();
-
-                public virtual int Marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) {
-                        return 0;
-                }
-                public virtual void Marshal2(
-                        OpenWireFormat wireFormat,
-                        Object o,
-                        BinaryWriter dataOut,
-                        BooleanStream bs) {
-                }
-
-                public virtual void Unmarshal(
-                        OpenWireFormat wireFormat,
-                        Object o,
-                        BinaryReader dataIn,
-                        BooleanStream bs) {
-                }
-
-                public virtual int Marshal1Long(OpenWireFormat wireFormat, long o, BooleanStream bs) {
-                        if (o == 0L) {
-                                bs.WriteBoolean(false);
-                                bs.WriteBoolean(false);
-                                return 0;
-                        } else {
-                                ulong ul = (ulong) o;
-                                if ((ul & 0xFFFFFFFFFFFF0000ul) == 0L) {
-                                        bs.WriteBoolean(false);
-                                        bs.WriteBoolean(true);
-                                        return 2;
-                                } else if ((ul & 0xFFFFFFFF00000000ul) == 0L) {
-                                        bs.WriteBoolean(true);
-                                        bs.WriteBoolean(false);
-                                        return 4;
-                                } else {
-                                        bs.WriteBoolean(true);
-                                        bs.WriteBoolean(true);
-                                        return 8;
-                                }
-                        }
-                }
-
-                public virtual void Marshal2Long(
-                        OpenWireFormat wireFormat,
-                        long o,
-                        BinaryWriter dataOut,
-                        BooleanStream bs) {
-                        if (bs.ReadBoolean()) {
-                                if (bs.ReadBoolean()) {
-                                        dataOut.Write(o);
-                                } else {
-                                        dataOut.Write((int) o);
-                                }
-                        } else {
-                                if (bs.ReadBoolean()) {
-                                        dataOut.Write((short) o);
-                                }
-                        }
-                }
-                public virtual long UnmarshalLong(OpenWireFormat wireFormat, BinaryReader dataIn, BooleanStream bs) {
-                        if (bs.ReadBoolean()) {
-                                if (bs.ReadBoolean()) {
-                                        return dataIn.ReadInt64();
-                                } else {
-                                        return dataIn.ReadInt32();
-                                }
-                        } else {
-                                if (bs.ReadBoolean()) {
-                                        return dataIn.ReadInt16();
-                                } else {
-                                        return 0;
-                                }
-                        }
-                }
-
-                protected virtual DataStructure UnmarshalNestedObject(
-                        OpenWireFormat wireFormat,
-                        BinaryReader dataIn,
-                        BooleanStream bs) {
-                        return wireFormat.UnmarshalNestedObject(dataIn, bs);
-                }
-
-                protected virtual int Marshal1NestedObject(
-                        OpenWireFormat wireFormat,
-                        DataStructure o,
-                        BooleanStream bs) {
-                        return wireFormat.Marshal1NestedObject(o, bs);
-                }
-
-                protected virtual void Marshal2NestedObject(
-                        OpenWireFormat wireFormat,
-                        DataStructure o,
-                        BinaryWriter dataOut,
-                        BooleanStream bs) {
-                        wireFormat.Marshal2NestedObject(o, dataOut, bs);
-                }
-
-                protected virtual DataStructure UnmarshalCachedObject(
-                        OpenWireFormat wireFormat,
-                        BinaryReader dataIn,
-                        BooleanStream bs) {
-                        /*
-                        if (wireFormat.isCacheEnabled()) {
-                                if (bs.ReadBoolean()) {
-                                        short index = dataIn.ReadInt16();
-                                        DataStructure value = wireFormat.UnmarshalNestedObject(dataIn, bs);
-                                        wireFormat.setInUnmarshallCache(index, value);
-                                        return value;
-                                } else {
-                                        short index = dataIn.ReadInt16();
-                                        return wireFormat.getFromUnmarshallCache(index);
-                                }
-                        } else {
-                                return wireFormat.UnmarshalNestedObject(dataIn, bs);
-                        }
-                         */
-                        return wireFormat.UnmarshalNestedObject(dataIn, bs);
-                }
-
-                protected virtual int Marshal1CachedObject(
-                        OpenWireFormat wireFormat,
-                        DataStructure o,
-                        BooleanStream bs) {
-                        /*
-                        if (wireFormat.isCacheEnabled()) {
-                                Short index = wireFormat.getMarshallCacheIndex(o);
-                                bs.WriteBoolean(index == null);
-                                if (index == null) {
-                                        int rc = wireFormat.Marshal1NestedObject(o, bs);
-                                        wireFormat.addToMarshallCache(o);
-                                        return 2 + rc;
-                                } else {
-                                        return 2;
-                                }
-                        } else {
-                                return wireFormat.Marshal1NestedObject(o, bs);
-                        }
-                         */
-                        return wireFormat.Marshal1NestedObject(o, bs);
-                }
-
-                protected virtual void Marshal2CachedObject(
-                        OpenWireFormat wireFormat,
-                        DataStructure o,
-                        BinaryWriter dataOut,
-                        BooleanStream bs) {
-                        /*
-                        if (wireFormat.isCacheEnabled()) {
-                                Short index = wireFormat.getMarshallCacheIndex(o);
-                                if (bs.ReadBoolean()) {
-                                        dataOut.Write((short)index.shortValue());
-                                        wireFormat.Marshal2NestedObject(o, dataOut, bs);
-                                } else {
-                                        dataOut.Write((short)index.shortValue());
-                                }
-                        } else {
-                                wireFormat.Marshal2NestedObject(o, dataOut, bs);
-                        }
-                         */
-                        wireFormat.Marshal2NestedObject(o, dataOut, bs);
-                }
-
-
-
-                protected virtual String ReadString(BinaryReader dataIn, BooleanStream bs) {
-                        if (bs.ReadBoolean()) {
-                                if (bs.ReadBoolean()) {
-                                        int size = dataIn.ReadInt16();
-                                        byte[] data = new byte[size];
-                                        dataIn.Read(data, 0, size);
-                                        char[] text = new char[size];
-                                        for (int i = 0; i < size; i++) {
-                                                text[i] = (char) data[i];
-                                        }
-                                        return new String(text);
-                                } else {
-                                        return dataIn.ReadString();
-                                }
-                        } else {
-                                return null;
-                        }
-                }
-
-                protected virtual int WriteString(String value, BooleanStream bs) {
-                        bs.WriteBoolean(value != null);
-                        if (value != null) {
-                                int strlen = value.Length;
-                                int utflen = 0;
-                                int c = 0;
-                                bool isOnlyAscii = true;
-                                char[] charr = value.ToCharArray();
-                                for (int i = 0; i < strlen; i++) {
-                                        c = charr[i];
-                                        if ((c >= 0x0001) && (c <= 0x007F)) {
-                                                utflen++;
-                                        } else if (c > 0x07FF) {
-                                                utflen += 3;
-                                                isOnlyAscii = false;
-                                        } else {
-                                                isOnlyAscii = false;
-                                                utflen += 2;
-                                        }
-                                }
-
-                                if (utflen >= Int16.MaxValue)
-                                        throw new IOException("Encountered a String value that is too long to encode.");
-
-                                bs.WriteBoolean(isOnlyAscii);
-                                return utflen + 2;
-                        } else {
-                                return 0;
-                        }
-                }
-
-                protected virtual void WriteString(String value, BinaryWriter dataOut, BooleanStream bs) {
-                        if (bs.ReadBoolean()) {
-                                // If we verified it only holds ascii values
-                                if (bs.ReadBoolean()) {
-                                        dataOut.Write((short) value.Length);
-                                        dataOut.Write(value);
-                                } else {
-                                        dataOut.Write(value);
-                                }
-                        }
-                }
-
-                protected virtual int MarshalObjectArray(
-                        OpenWireFormat wireFormat,
-                        DataStructure[] objects,
-                        BooleanStream bs) {
-                        if (objects != null) {
-                                int rc = 0;
-                                bs.WriteBoolean(true);
-                                rc += 2;
-                                for (int i = 0; i < objects.Length; i++) {
-                                        rc += Marshal1NestedObject(wireFormat, objects[i], bs);
-                                }
-                                return rc;
-                        } else {
-                                bs.WriteBoolean(false);
-                                return 0;
-                        }
-                }
-
-                protected virtual void MarshalObjectArray(
-                        OpenWireFormat wireFormat,
-                        DataStructure[] objects,
-                        BinaryWriter dataOut,
-                        BooleanStream bs) {
-                        if (bs.ReadBoolean()) {
-                                dataOut.Write((short) objects.Length);
-                                for (int i = 0; i < objects.Length; i++) {
-                                        Marshal2NestedObject(wireFormat, objects[i], dataOut, bs);
-                                }
-                        }
-                }
-
-                protected virtual byte[] ReadBytes(BinaryReader dataIn, bool flag) {
-                        if (flag) {
-                                int size = dataIn.ReadInt32();
-                                return dataIn.ReadBytes(size);
-                        } else {
-                                return null;
-                        }
-                }
-
-                protected virtual byte[] ReadBytes(BinaryReader dataIn) {
-                        int size = dataIn.ReadInt32();
-                        return dataIn.ReadBytes(size);
-                }
-
-                protected virtual byte[] ReadBytes(BinaryReader dataIn, int size) {
-                        return dataIn.ReadBytes(size);
-                }
-
-                protected virtual void WriteBytes(byte[] command, BinaryWriter dataOut) {
-                        dataOut.Write(command.Length);
-                        dataOut.Write(command);
-                }
-
-                protected virtual BrokerError UnmarshalBrokerError(
-                        OpenWireFormat wireFormat,
-                        BinaryReader dataIn,
-                        BooleanStream bs) {
-                        if (bs.ReadBoolean()) {
-                                String clazz = ReadString(dataIn, bs);
-                                String message = ReadString(dataIn, bs);
-                                
-                                BrokerError answer = new BrokerError();
-                                answer.ExceptionClass = clazz;
-                                answer.Message = message;
-                                return answer;
-                        } else {
-                                return null;
-                        }
-                }
-
-                protected int MarshalBrokerError(OpenWireFormat wireFormat, BrokerError o, BooleanStream bs) {
-                        if (o == null) {
-                                bs.WriteBoolean(false);
-                                return 0;
-                        } else {
-                                int rc = 0;
-                                bs.WriteBoolean(true);
-                                rc += WriteString(o.ExceptionClass, bs);
-                                rc += WriteString(o.Message, bs);
-                                return rc;
-                        }
-                }
-
-                protected void MarshalBrokerError(
-                        OpenWireFormat wireFormat,
-                        BrokerError o,
-                        BinaryWriter dataOut,
-                        BooleanStream bs) {
-                        if (bs.ReadBoolean()) {
-                                WriteString(o.ExceptionClass, dataOut, bs);
-                                WriteString(o.Message, dataOut, bs);
-                        }
-                }
+namespace OpenWire.Client.Core
+{
+    /// <summary>
+    /// A base class with useful implementation inheritence methods
+    /// for creating marshallers of the OpenWire protocol
+    /// </summary>
+    public abstract class DataStreamMarshaller
+    {
+        
+        public abstract DataStructure CreateObject();
+        public abstract byte GetDataStructureType();
+        
+        public virtual int Marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs)
+        {
+            return 0;
+        }
+        public virtual void Marshal2(
+            OpenWireFormat wireFormat,
+            Object o,
+            BinaryWriter dataOut,
+            BooleanStream bs)
+        {
+        }
+        
+        public virtual void Unmarshal(
+            OpenWireFormat wireFormat,
+            Object o,
+            BinaryReader dataIn,
+            BooleanStream bs)
+        {
+        }
+        
+        
+        protected virtual DataStructure UnmarshalNestedObject(
+            OpenWireFormat wireFormat,
+            BinaryReader dataIn,
+            BooleanStream bs)
+        {
+            return wireFormat.UnmarshalNestedObject(dataIn, bs);
+        }
+        
+        protected virtual int Marshal1NestedObject(
+            OpenWireFormat wireFormat,
+            DataStructure o,
+            BooleanStream bs)
+        {
+            return wireFormat.Marshal1NestedObject(o, bs);
+        }
+        
+        protected virtual void Marshal2NestedObject(
+            OpenWireFormat wireFormat,
+            DataStructure o,
+            BinaryWriter dataOut,
+            BooleanStream bs)
+        {
+            wireFormat.Marshal2NestedObject(o, dataOut, bs);
+        }
+        
+        protected virtual DataStructure UnmarshalCachedObject(
+            OpenWireFormat wireFormat,
+            BinaryReader dataIn,
+            BooleanStream bs)
+        {
+            /*
+             if (wireFormat.isCacheEnabled()) {
+             if (bs.ReadBoolean()) {
+             short index = dataInReadShort(dataIn)Int16();
+             DataStructure value = wireFormat.UnmarshalNestedObject(dataIn, bs);
+             wireFormat.setInUnmarshallCache(index, value);
+             return value;
+             } else {
+             short index = ReadShort(dataIn);
+             return wireFormat.getFromUnmarshallCache(index);
+             }
+             } else {
+             return wireFormat.UnmarshalNestedObject(dataIn, bs);
+             }
+             */
+            return wireFormat.UnmarshalNestedObject(dataIn, bs);
+        }
+        
+        protected virtual int Marshal1CachedObject(
+            OpenWireFormat wireFormat,
+            DataStructure o,
+            BooleanStream bs)
+        {
+            /*
+             if (wireFormat.isCacheEnabled()) {
+             Short index = wireFormat.getMarshallCacheIndex(o);
+             bs.WriteBoolean(index == null);
+             if (index == null) {
+             int rc = wireFormat.Marshal1NestedObject(o, bs);
+             wireFormat.addToMarshallCache(o);
+             return 2 + rc;
+             } else {
+             return 2;
+             }
+             } else {
+             return wireFormat.Marshal1NestedObject(o, bs);
+             }
+             */
+            return wireFormat.Marshal1NestedObject(o, bs);
+        }
+        
+        protected virtual void Marshal2CachedObject(
+            OpenWireFormat wireFormat,
+            DataStructure o,
+            BinaryWriter dataOut,
+            BooleanStream bs)
+        {
+            /*
+             if (wireFormat.isCacheEnabled()) {
+             Short index = wireFormat.getMarshallCacheIndex(o);
+             if (bs.ReadBoolean()) {
+             WriteShort(index.shortValue(), dataOut);
+             wireFormat.Marshal2NestedObject(o, dataOut, bs);
+             } else {
+             WriteShort(index.shortValue(), dataOut);
+             }
+             } else {
+             wireFormat.Marshal2NestedObject(o, dataOut, bs);
+             }
+             */
+            wireFormat.Marshal2NestedObject(o, dataOut, bs);
+        }
+        
+        
+        
+        protected virtual String ReadString(BinaryReader dataIn, BooleanStream bs)
+        {
+            if (bs.ReadBoolean())
+            {
+                if (bs.ReadBoolean())
+                {
+                    int size = ReadShort(dataIn);
+                    byte[] data = new byte[size];
+                    dataIn.Read(data, 0, size);
+                    char[] text = new char[size];
+                    for (int i = 0; i < size; i++)
+                    {
+                        text[i] = (char) data[i];
+                    }
+                    return new String(text);
+                }
+                else
+                {
+                    return dataIn.ReadString();
+                }
+            }
+            else
+            {
+                return null;
+            }
+        }
+        
+        protected virtual int WriteString(String value, BooleanStream bs)
+        {
+            bs.WriteBoolean(value != null);
+            if (value != null)
+            {
+                int strlen = value.Length;
+
+                // TODO until we get UTF8 working, lets just force ASCII
+                bs.WriteBoolean(true);
+                return strlen + 2;
+                
+                
                 /*
-                protected virtual ActiveMQDestination ReadDestination(BinaryReader dataIn) {
-                        return (ActiveMQDestination) CommandMarshallerRegistry.ReadCommand(dataIn);
-                }
-
-                protected virtual void WriteDestination(ActiveMQDestination command, BinaryWriter dataOut) {
-                        CommandMarshallerRegistry.WriteCommand(command, dataOut);
-                }
-
-                protected virtual BrokerId[] ReadBrokerIds(BinaryReader dataIn) {
-                        int size = dataIn.ReadInt32();
-                        BrokerId[] answer = new BrokerId[size];
-                        for (int i = 0; i < size; i++) {
-                                answer[i] = (BrokerId) CommandMarshallerRegistry.BrokerIdMarshaller.ReadCommand(dataIn);
-                        }
-                        return answer;
-                }
-
-                protected virtual void WriteBrokerIds(BrokerId[] commands, BinaryWriter dataOut) {
-                        int size = commands.Length;
-                        dataOut.Write(size);
-                        for (int i = 0; i < size; i++) {
-                                CommandMarshallerRegistry.BrokerIdMarshaller.WriteCommand(commands[i], dataOut);
-                        }
+                int utflen = 0;
+                int c = 0;
+                bool isOnlyAscii = true;
+                char[] charr = value.ToCharArray();
+                for (int i = 0; i < strlen; i++)
+                {
+                    c = charr[i];
+                    if ((c >= 0x0001) && (c <= 0x007F))
+                    {
+                        utflen++;
+                    }
+                    else if (c > 0x07FF)
+                    {
+                        utflen += 3;
+                        isOnlyAscii = false;
+                    }
+                    else
+                    {
+                        isOnlyAscii = false;
+                        utflen += 2;
+                    }
+                }
+                
+                if (utflen >= Int16.MaxValue)
+                    throw new IOException("Encountered a String value that is too long to encode.");
+                
+                bs.WriteBoolean(isOnlyAscii);
+                return utflen + 2;
+                */
+            }
+            else
+            {
+                return 0;
+            }
+        }
+        
+        public static void WriteString(String value, BinaryWriter dataOut, BooleanStream bs)
+        {
+            if (bs.ReadBoolean())
+            {
+                // If we verified it only holds ascii values
+                if (bs.ReadBoolean())
+                {
+                    WriteShort((short) value.Length, dataOut);
+                    // now lets write the bytes
+                    char[] chars = value.ToCharArray();
+                    for (int i = 0; i < chars.Length; i++) {
+                        WriteByte((byte) chars[i], dataOut);
+                    }
+                }
+                else
+                {
+                    // TODO how should we properly write a String so that Java will grok it???
+                    dataOut.Write(value);
                 }
-
-
-                protected virtual BrokerInfo[] ReadBrokerInfos(BinaryReader dataIn) {
-                        int size = dataIn.ReadInt32();
-                        BrokerInfo[] answer = new BrokerInfo[size];
-                        for (int i = 0; i < size; i++) {
-                                answer[i] = (BrokerInfo) CommandMarshallerRegistry
-                                        .BrokerInfoMarshaller
-                                        .ReadCommand(dataIn);
-                        }
-                        return answer;
+            }
+        }
+        
+        public static byte ReadByte(BinaryReader dataIn)
+        {
+            return dataIn.ReadByte();
+        }
+        
+        public static char ReadChar(BinaryReader dataIn)
+        {
+            return (char) ReadShort(dataIn);
+        }
+        
+        public static short ReadShort(BinaryReader dataIn)
+        {
+            return SwitchEndian(dataIn.ReadInt16());
+        }
+        
+        public static int ReadInt(BinaryReader dataIn)
+        {
+            return SwitchEndian(dataIn.ReadInt32());
+        }
+        
+        public static long ReadLong(BinaryReader dataIn)
+        {
+            return SwitchEndian(dataIn.ReadInt64());
+        }
+        
+        public static void WriteByte(byte value, BinaryWriter dataOut)
+        {
+            dataOut.Write(value);
+        }
+        
+        public static void WriteChar(char value, BinaryWriter dataOut)
+        {
+            dataOut.Write(SwitchEndian(value));
+        }
+        
+        public static void WriteShort(short value, BinaryWriter dataOut)
+        {
+            dataOut.Write(SwitchEndian(value));
+        }
+        
+        public static void WriteInt(int value, BinaryWriter dataOut)
+        {
+            dataOut.Write(SwitchEndian(value));
+        }
+        
+        /// <summary>
+        /// Switches from one endian to the other
+        /// </summary>
+        /// <param name="value">An int</param>
+        /// <returns>An int</retutns>
+        public static int SwitchEndian(int x)
+        {
+            return ((x << 24) | ((x & 0xff00) << 8) | ((x & 0xff0000) >> 8) | (x >> 24));
+        }
+        
+        public static short SwitchEndian(short x)
+        {
+            int low = x & 0xff;
+            int high = x & 0xff00;
+            return(short)(high >> 8 | low << 8);
+        }
+        
+        public static long SwitchEndian(long x)
+        {
+            long answer = 0;
+            for (int i = 0; i < 8; i++) {
+                long lowest = x & 0xff;
+                x >>= 8;
+                answer <<= 8;
+                answer += lowest;
+            }
+            return answer;
+        }
+        
+        public static void WriteLong(long value, BinaryWriter dataOut)
+        {
+            dataOut.Write(IPAddress.HostToNetworkOrder(value));
+        }
+        
+        public virtual int Marshal1Long(OpenWireFormat wireFormat, long o, BooleanStream bs)
+        {
+            if (o == 0L)
+            {
+                bs.WriteBoolean(false);
+                bs.WriteBoolean(false);
+                return 0;
+            }
+            else
+            {
+                ulong ul = (ulong) o;
+                if ((ul & 0xFFFFFFFFFFFF0000ul) == 0L)
+                {
+                    bs.WriteBoolean(false);
+                    bs.WriteBoolean(true);
+                    return 2;
+                }
+                else if ((ul & 0xFFFFFFFF00000000ul) == 0L)
+                {
+                    bs.WriteBoolean(true);
+                    bs.WriteBoolean(false);
+                    return 4;
+                }
+                else
+                {
+                    bs.WriteBoolean(true);
+                    bs.WriteBoolean(true);
+                    return 8;
                 }
-
-                protected virtual void WriteBrokerInfos(BrokerInfo[] commands, BinaryWriter dataOut) {
-                        int size = commands.Length;
-                        dataOut.Write(size);
-                        for (int i = 0; i < size; i++) {
-                                CommandMarshallerRegistry.BrokerInfoMarshaller.WriteCommand(commands[i], dataOut);
-                        }
+            }
+        }
+        
+        public virtual void Marshal2Long(
+            OpenWireFormat wireFormat,
+            long o,
+            BinaryWriter dataOut,
+            BooleanStream bs)
+        {
+            if (bs.ReadBoolean())
+            {
+                if (bs.ReadBoolean())
+                {
+                    WriteLong(o, dataOut);
+                }
+                else
+                {
+                    WriteInt((int) o, dataOut);
+                }
+            }
+            else
+            {
+                if (bs.ReadBoolean())
+                {
+                    WriteShort((short) o, dataOut);
                 }
-
-
-                protected virtual DataStructure[] ReadDataStructures(BinaryReader dataIn) {
-                        int size = dataIn.ReadInt32();
-                        DataStructure[] answer = new DataStructure[size];
-                        for (int i = 0; i < size; i++) {
-                                answer[i] = (DataStructure) CommandMarshallerRegistry.ReadCommand(dataIn);
-                        }
-                        return answer;
+            }
+        }
+        public virtual long UnmarshalLong(OpenWireFormat wireFormat, BinaryReader dataIn, BooleanStream bs)
+        {
+            if (bs.ReadBoolean())
+            {
+                if (bs.ReadBoolean())
+                {
+                    return ReadLong(dataIn);
+                }
+                else
+                {
+                    return ReadInt(dataIn);
+                }
+            }
+            else
+            {
+                if (bs.ReadBoolean())
+                {
+                    return ReadShort(dataIn);
+                }
+                else
+                {
+                    return 0;
                 }
-
-                protected virtual void WriteDataStructures(DataStructure[] commands, BinaryWriter dataOut) {
-                        int size = commands.Length;
-                        dataOut.Write(size);
-                        for (int i = 0; i < size; i++) {
-                                CommandMarshallerRegistry.WriteCommand((Command) commands[i], dataOut);
-                        }
+            }
+        }
+        protected virtual int MarshalObjectArray(
+            OpenWireFormat wireFormat,
+            DataStructure[] objects,
+            BooleanStream bs)
+        {
+            if (objects != null)
+            {
+                int rc = 0;
+                bs.WriteBoolean(true);
+                rc += 2;
+                for (int i = 0; i < objects.Length; i++)
+                {
+                    rc += Marshal1NestedObject(wireFormat, objects[i], bs);
+                }
+                return rc;
+            }
+            else
+            {
+                bs.WriteBoolean(false);
+                return 0;
+            }
+        }
+        
+        protected virtual void MarshalObjectArray(
+            OpenWireFormat wireFormat,
+            DataStructure[] objects,
+            BinaryWriter dataOut,
+            BooleanStream bs)
+        {
+            if (bs.ReadBoolean())
+            {
+                WriteShort((short) objects.Length, dataOut);
+                for (int i = 0; i < objects.Length; i++)
+                {
+                    Marshal2NestedObject(wireFormat, objects[i], dataOut, bs);
                 }
-                 */
+            }
+        }
+        
+        protected virtual byte[] ReadBytes(BinaryReader dataIn, bool flag)
+        {
+            if (flag)
+            {
+                int size = ReadInt(dataIn);
+                return dataIn.ReadBytes(size);
+            }
+            else
+            {
+                return null;
+            }
+        }
+        
+        protected virtual byte[] ReadBytes(BinaryReader dataIn)
+        {
+            int size = ReadInt(dataIn);
+            return dataIn.ReadBytes(size);
+        }
+        
+        protected virtual byte[] ReadBytes(BinaryReader dataIn, int size)
+        {
+            return dataIn.ReadBytes(size);
+        }
+        
+        protected virtual void WriteBytes(byte[] command, BinaryWriter dataOut)
+        {
+            WriteInt(command.Length, dataOut);
+            dataOut.Write(command);
+        }
+        
+        protected virtual BrokerError UnmarshalBrokerError(
+            OpenWireFormat wireFormat,
+            BinaryReader dataIn,
+            BooleanStream bs)
+        {
+            if (bs.ReadBoolean())
+            {
+                String clazz = ReadString(dataIn, bs);
+                String message = ReadString(dataIn, bs);
+                
+                BrokerError answer = new BrokerError();
+                answer.ExceptionClass = clazz;
+                answer.Message = message;
+                return answer;
+            }
+            else
+            {
+                return null;
+            }
+        }
+        
+        protected int MarshalBrokerError(OpenWireFormat wireFormat, BrokerError o, BooleanStream bs)
+        {
+            if (o == null)
+            {
+                bs.WriteBoolean(false);
+                return 0;
+            }
+            else
+            {
+                int rc = 0;
+                bs.WriteBoolean(true);
+                rc += WriteString(o.ExceptionClass, bs);
+                rc += WriteString(o.Message, bs);
+                return rc;
+            }
+        }
+        
+        protected void MarshalBrokerError(
+            OpenWireFormat wireFormat,
+            BrokerError o,
+            BinaryWriter dataOut,
+            BooleanStream bs)
+        {
+            if (bs.ReadBoolean())
+            {
+                WriteString(o.ExceptionClass, dataOut, bs);
+                WriteString(o.Message, dataOut, bs);
+            }
         }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs Thu Feb 23 10:16:15 2006
@@ -4,52 +4,67 @@
 using OpenWire.Client;
 using OpenWire.Client.Commands;
 
-namespace OpenWire.Client.Core {
-        /// <summary>
-        /// Handles asynchronous responses
-        /// </summary>
-        public class FutureResponse : IAsyncResult {
-
-                private Response response;
-                private Mutex asyncWaitHandle = new Mutex();
-                private Object semaphore = new Object();
-                private int maxWait = 3000;
-                private bool isCompleted;
-
-                public WaitHandle AsyncWaitHandle {
-                        get { return asyncWaitHandle; } 
+namespace OpenWire.Client.Core
+{
+    /// <summary>
+    /// Handles asynchronous responses
+    /// </summary>
+    public class FutureResponse : IAsyncResult
+    {
+        
+        private Response response;
+        private Mutex asyncWaitHandle = new Mutex();
+        private Object semaphore = new Object();
+        private int maxWait = 3000;
+        private bool isCompleted;
+        
+        public WaitHandle AsyncWaitHandle
+        {
+            get { return asyncWaitHandle; }
+        }
+        
+        public object AsyncState
+        {
+            get { return response; }
+            set { Response = (Response) value; }
+        }
+        
+        public bool IsCompleted
+        {
+            get { return isCompleted; }
+        }
+        
+        public bool CompletedSynchronously
+        {
+            get { return false; }
+        }
+        
+        public Response Response
+        {
+            // Blocks the caller until a value has been set
+            get {
+                while (response == null)
+                {
+                    try {
+                    lock (semaphore)
+                    {
+                        Monitor.Wait(semaphore, maxWait);
+                    }
+                    }
+                    catch (Exception e) {
+                        Console.WriteLine("Caught while waiting on monitor: " + e);
+                    }
                 }
-
-                public object AsyncState {
-                        get { return response; }
-                        set { Response = (Response) value; } 
+                return response;
+            }
+            set {
+                lock (semaphore)
+                {
+                    response = value;
+                    isCompleted = true;
+                    Monitor.PulseAll(semaphore);
                 }
-
-                public bool IsCompleted {
-                        get { return isCompleted; } 
-                }
-
-                public bool CompletedSynchronously {
-                        get { return false; } 
-                }
-
-                public Response Response {
-                        // Blocks the caller until a value has been set
-                        get {
-                                lock (semaphore) {
-                                        while (response == null) {
-                                                Monitor.Wait(semaphore, maxWait); 
-                                        }
-                                        return response; 
-                                } 
-                        }
-                        set {
-                                lock (semaphore) {
-                                        response = value;
-                                        isCompleted = true;
-                                        Monitor.PulseAll(semaphore); 
-                                } 
-                        }
-                } 
-        } 
+            }
+        }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs Thu Feb 23 10:16:15 2006
@@ -5,129 +5,166 @@
 using OpenWire.Client.Core;
 using OpenWire.Client.IO;
 
-namespace OpenWire.Client.Core {
-        /// <summary>
-        /// Represents the wire format
-        /// </summary>
-        public class OpenWireFormat {
-                private DataStreamMarshaller[] dataMarshallers;
-                private const byte NULL_TYPE = 0;
-
-
-                public void addMarshaller(DataStreamMarshaller marshaller) 
+namespace OpenWire.Client.Core
+{
+    /// <summary>
+    /// Represents the wire format
+    /// </summary>
+    public class OpenWireFormat
+    {
+        private DataStreamMarshaller[] dataMarshallers;
+        private const byte NULL_TYPE = 0;
+        
+        
+        public OpenWireFormat()
+        {
+            dataMarshallers = new DataStreamMarshaller[256];
+            MarshallerFactory factory = new MarshallerFactory();
+            factory.configure(this);
+        }
+        
+        public void addMarshaller(DataStreamMarshaller marshaller)
+        {
+            byte type = marshaller.GetDataStructureType();
+            dataMarshallers[type & 0xFF] = marshaller;
+        }
+        
+        public void Marshal(Object o, BinaryWriter ds)
+        {
+            int size = 1;
+            if (o != null)
+            {
+                DataStructure c = (DataStructure) o;
+                byte type = c.GetDataStructureType();
+                DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+                if (dsm == null)
+                    throw new IOException("Unknown data type: " + type);
+                
+                BooleanStream bs = new BooleanStream();
+                size += dsm.Marshal1(this, c, bs);
+                size += bs.MarshalledSize();
+                
+                DataStreamMarshaller.WriteInt(size, ds);
+                DataStreamMarshaller.WriteByte(type, ds);
+                bs.Marshal(ds);
+                dsm.Marshal2(this, c, ds, bs);
+            }
+            else
+            {
+                DataStreamMarshaller.WriteInt(size, ds);
+                DataStreamMarshaller.WriteByte(NULL_TYPE, ds);
+            }
+        }
+        
+        public Object Unmarshal(BinaryReader dis)
+        {
+            int size = DataStreamMarshaller.ReadInt(dis);
+            byte dataType = DataStreamMarshaller.ReadByte(dis);
+            if (dataType != NULL_TYPE)
+            {
+                DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+                if (dsm == null)
+                    throw new IOException("Unknown data type: " + dataType);
+                Console.WriteLine("Parsing type: " + dataType + " with: " + dsm);
+                Object data = dsm.CreateObject();
+                BooleanStream bs = new BooleanStream();
+                bs.Unmarshal(dis);
+                dsm.Unmarshal(this, data, dis, bs);
+                return data;
+            }
+            else
+            {
+                return null;
+            }
+        }
+        
+        public int Marshal1NestedObject(DataStructure o, BooleanStream bs)
+        {
+            bs.WriteBoolean(o != null);
+            if (o == null)
+                return 0;
+            
+            if (o.IsMarshallAware())
+            {
+                MarshallAware ma = (MarshallAware) o;
+                byte[] sequence = ma.GetMarshalledForm(this);
+                bs.WriteBoolean(sequence != null);
+                if (sequence != null)
                 {
-                        dataMarshallers[marshaller.GetDataStructureType()] = marshaller;
+                    return 1 + sequence.Length;
                 }
+            }
+            
+            byte type = o.GetDataStructureType();
+            if (type == 0) {
+                throw new IOException("No valid data structure type for: " + o + " of type: " + o.GetType());
+            }
+            DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+            if (dsm == null)
+                throw new IOException("Unknown data type: " + type);
+            Console.WriteLine("Marshalling type: " + type + " with structure: " + o);
+            return 1 + dsm.Marshal1(this, o, bs);
+        }
+        
+        public void Marshal2NestedObject(DataStructure o, BinaryWriter ds, BooleanStream bs)
+        {
+            if (!bs.ReadBoolean())
+                return ;
+            
+            byte type = o.GetDataStructureType();
+            DataStreamMarshaller.WriteByte(type, ds);
+            
+            if (o.IsMarshallAware() && bs.ReadBoolean())
+            {
+                MarshallAware ma = (MarshallAware) o;
+                byte[] sequence = ma.GetMarshalledForm(this);
+                ds.Write(sequence, 0, sequence.Length);
+            }
+            else
+            {
                 
-                public void Marshal(Object o, BinaryWriter ds) {
-                        int size = 1;
-                        if (o != null) {
-                                DataStructure c = (DataStructure) o;
-                                byte type = c.GetDataStructureType();
-                                DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
-                                if (dsm == null)
-                                        throw new IOException("Unknown data type: " + type);
-
-                                BooleanStream bs = new BooleanStream();
-                                size += dsm.Marshal1(this, c, bs);
-                                size += bs.MarshalledSize();
-
-                                ds.Write(size);
-                                ds.Write(type);
-                                bs.Marshal(ds);
-                                dsm.Marshal2(this, c, ds, bs);
-                        } else {
-                                ds.Write(size);
-                                ds.Write(NULL_TYPE);
-                        }
-                }
-
-                public Object Unmarshal(BinaryReader dis) {
-                        byte dataType = dis.ReadByte();
-                        if (dataType != NULL_TYPE) {
-                                DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
-                                if (dsm == null)
-                                        throw new IOException("Unknown data type: " + dataType);
-                                Object data = dsm.CreateObject();
-                                BooleanStream bs = new BooleanStream();
-                                bs.Unmarshal(dis);
-                                dsm.Unmarshal(this, data, dis, bs);
-                                return data;
-                        } else {
-                                return null;
-                        }
-                }
-
-                public int Marshal1NestedObject(DataStructure o, BooleanStream bs) {
-                        bs.WriteBoolean(o != null);
-                        if (o == null)
-                                return 0;
-
-                        if (o.IsMarshallAware()) {
-                                MarshallAware ma = (MarshallAware) o;
-                                byte[] sequence = ma.GetMarshalledForm(this);
-                                bs.WriteBoolean(sequence != null);
-                                if (sequence != null) {
-                                        return 1 + sequence.Length;
-                                }
-                        }
-
-                        byte type = o.GetDataStructureType();
-                        DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
-                        if (dsm == null)
-                                throw new IOException("Unknown data type: " + type);
-                        return 1 + dsm.Marshal1(this, o, bs);
-                }
-
-                public void Marshal2NestedObject(DataStructure o, BinaryWriter ds, BooleanStream bs) {
-                        if (!bs.ReadBoolean())
-                                return ;
-
-                        byte type = o.GetDataStructureType();
-                        ds.Write(type);
-
-                        if (o.IsMarshallAware() && bs.ReadBoolean()) {
-                                MarshallAware ma = (MarshallAware) o;
-                                byte[] sequence = ma.GetMarshalledForm(this);
-                                ds.Write(sequence, 0, sequence.Length);
-                        } else {
-
-                                DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
-                                if (dsm == null)
-                                        throw new IOException("Unknown data type: " + type);
-                                dsm.Marshal2(this, o, ds, bs);
-                        }
+                DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+                if (dsm == null)
+                    throw new IOException("Unknown data type: " + type);
+                dsm.Marshal2(this, o, ds, bs);
+            }
+        }
+        
+        public DataStructure UnmarshalNestedObject(BinaryReader dis, BooleanStream bs)
+        {
+            if (bs.ReadBoolean())
+            {
+                
+                byte dataType = DataStreamMarshaller.ReadByte(dis);
+                DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+                if (dsm == null)
+                    throw new IOException("Unknown data type: " + dataType);
+                DataStructure data = dsm.CreateObject();
+                
+                if (data.IsMarshallAware() && bs.ReadBoolean())
+                {
+                    DataStreamMarshaller.ReadInt(dis);
+                    DataStreamMarshaller.ReadByte(dis);
+                    
+                    BooleanStream bs2 = new BooleanStream();
+                    bs2.Unmarshal(dis);
+                    dsm.Unmarshal(this, data, dis, bs2);
+                    
+                    // TODO: extract the sequence from the dis and associate it.
+                    //                MarshallAware ma = (MarshallAware)data
+                    //                ma.setCachedMarshalledForm(this, sequence);
                 }
-
-                public DataStructure UnmarshalNestedObject(BinaryReader dis, BooleanStream bs) {
-                        if (bs.ReadBoolean()) {
-
-                                byte dataType = dis.ReadByte();
-                                DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
-                                if (dsm == null)
-                                        throw new IOException("Unknown data type: " + dataType);
-                                DataStructure data = dsm.CreateObject();
-
-                                if (data.IsMarshallAware() && bs.ReadBoolean()) {
-
-                                        dis.ReadInt32();
-                                        dis.ReadByte();
-
-                                        BooleanStream bs2 = new BooleanStream();
-                                        bs2.Unmarshal(dis);
-                                        dsm.Unmarshal(this, data, dis, bs2);
-
-                                        // TODO: extract the sequence from the dis and associate it.
-                                        //                MarshallAware ma = (MarshallAware)data
-                                        //                ma.setCachedMarshalledForm(this, sequence);
-                                } else {
-                                        dsm.Unmarshal(this, data, dis, bs);
-                                }
-
-                                return data;
-                        } else {
-                                return null;
-                        }
+                else
+                {
+                    dsm.Unmarshal(this, data, dis, bs);
                 }
+                
+                return data;
+            }
+            else
+            {
+                return null;
+            }
         }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs Thu Feb 23 10:16:15 2006
@@ -12,134 +12,193 @@
 using OpenWire.Client.Core;
 using OpenWire.Client.IO;
 
-namespace OpenWire.Client.Core {
-
-        /// <summary>
-        /// An implementation of ITransport that uses sockets to communicate with the broker
-        /// </summary>
-        public class SocketTransport : ITransport {
-                private readonly object transmissionLock = new object();
-                private readonly Socket socket;
-                private readonly BinaryReader socketReader;
-                private readonly BinaryWriter socketWriter;
-                private readonly Thread readThread;
-                private bool closed;
-                private IDictionary requestMap = new Hashtable(); // TODO threadsafe
-                private short nextCommandId;
-
-                public event CommandHandler Command;
-                public event ExceptionHandler Exception;
-                private OpenWireFormat wireformat = new OpenWireFormat();
-
-                public SocketTransport(string host, int port) {
-                        Console.WriteLine("Opening socket to: " + host + " on port: " + port);
-                        socket = Connect(host, port);
-                        socketWriter = new BinaryWriter(new NetworkStream(socket));
-                        socketReader = new BinaryReader(new NetworkStream(socket));
-
-                        // now lets create the background read thread
-                        readThread = new Thread(new ThreadStart(ReadLoop));
-                        readThread.Start(); 
-                }
-
-                public void Oneway(Command command) {
-                        BaseCommand baseCommand = (BaseCommand) command;
-                        baseCommand.CommandId = GetNextCommandId();
-                        baseCommand.ResponseRequired = false;
-                        Send(command); 
-                }
-
-                public FutureResponse AsyncRequest(Command command) {
-                        BaseCommand baseCommand = (BaseCommand) command;
-                        baseCommand.CommandId = GetNextCommandId();
-                        baseCommand.ResponseRequired = true;
-                        Send(command);
-                        FutureResponse future = new FutureResponse();
-                        requestMap[baseCommand.CommandId] = future;
-                        return future; 
-                }
-
-                public Response Request(Command command) {
-                        FutureResponse response = AsyncRequest(command);
-                        return response.Response; 
-                }
-
-                public void Dispose() {
-                        Console.WriteLine("Closing the socket");
-                        lock (transmissionLock) {
-                                socket.Close();
-                                closed = true; 
+namespace OpenWire.Client.Core
+{
+    
+    /// <summary>
+    /// An implementation of ITransport that uses sockets to communicate with the broker
+    /// </summary>
+    public class SocketTransport : ITransport
+    {
+        private readonly object transmissionLock = new object();
+        private readonly Socket socket;
+        private OpenWireFormat wireformat = new OpenWireFormat();
+        private readonly BinaryReader socketReader;
+        private readonly BinaryWriter socketWriter;
+        private readonly Thread readThread;
+        private bool closed;
+        private IDictionary requestMap = new Hashtable(); // TODO threadsafe
+        private short nextCommandId;
+        
+        public event CommandHandler Command;
+        public event ExceptionHandler Exception;
+        
+        public SocketTransport(string host, int port)
+        {
+            Console.WriteLine("Opening socket to: " + host + " on port: " + port);
+            socket = Connect(host, port);
+            NetworkStream networkStream = new NetworkStream(socket);
+            socketWriter = new BinaryWriter(networkStream);
+            socketReader = new BinaryReader(networkStream);
+            /*
+             socketWriter = new BinaryWriter(new NetworkStream(socket));
+             socketReader = new BinaryReader(new NetworkStream(socket));
+             */
+            
+            // now lets create the background read thread
+            readThread = new Thread(new ThreadStart(ReadLoop));
+            readThread.Start();
+        }
+        
+        public void Oneway(Command command)
+        {
+            command.CommandId = GetNextCommandId();
+            command.ResponseRequired = false;
+            Send(command);
+        }
+        
+        public FutureResponse AsyncRequest(Command command)
+        {
+            command.CommandId = GetNextCommandId();
+            command.ResponseRequired = true;
+            Send(command);
+            FutureResponse future = new FutureResponse();
+            requestMap[command.CommandId] = future;
+            return future;
+        }
+        
+        public Response Request(Command command)
+        {
+            FutureResponse response = AsyncRequest(command);
+            return response.Response;
+        }
+        
+        public void Dispose()
+        {
+            Console.WriteLine("Closing the socket");
+            lock (transmissionLock)
+            {
+                socket.Close();
+                closed = true;
+            }
+            socketWriter.Close();
+            socketReader.Close();
+        }
+        
+        public void ReadLoop()
+        {
+            Console.WriteLine("Starting to read commands from ActiveMQ");
+            while (!closed)
+            {
+                Command command = null;
+                try
+                {
+                    command = (Command) wireformat.Unmarshal(socketReader);
+                    if (command != null)
+                    {
+                        Console.WriteLine("Received command: " + command);
+                        if (command is RemoveInfo)
+                        {
+                            RemoveInfo info = (RemoveInfo) command;
+                            Console.WriteLine("Remove CommandId: " + info.CommandId);
+                            Console.WriteLine("Remove ObjectID: " + info.ObjectId);
                         }
-                        socketWriter.Close();
-                        socketReader.Close(); 
+                    }
                 }
-
-                public void ReadLoop() {
-                        Console.WriteLine("Starting to read commands from ActiveMQ");
-                        while (!closed) {
-                                BaseCommand command = null;
-                                try {
-                                        command = (BaseCommand) wireformat.Unmarshal(socketReader); 
-                                } catch (ObjectDisposedException e) {
-                                        // stream closed
-                                        break; 
-                                }
-                                if (command is Response) {
-                                        Console.WriteLine("Received response!: " + command);
-                                        Response response = (Response) command;
-                                        FutureResponse future = (FutureResponse) requestMap[response.CommandId];
-                                        if (future != null) {
-                                                if (response is ExceptionResponse) {
-                                                        ExceptionResponse er = (ExceptionResponse) response;
-                                                        if (this.Exception != null) {
-                                                                Exception e = new BrokerException(er.Exception);
-                                                                this.Exception(this, e); 
-                                                        } 
-                                                } else {
-                                                        future.Response = response; 
-                                                } 
-                                        } else {
-                                                Console.WriteLine("Unknown response ID: " + response.CommandId); 
-                                        } 
-                                } else {
-                                        if (this.Command != null) {
-                                                this.Command(this, command); 
-                                        } 
-                                } 
-                        } 
-                }
-
-
-                // Implementation methods
-
-                protected void Send(Command command) {
-                        lock (transmissionLock) {
-                                wireformat.Marshal(command, socketWriter);
-                                socketWriter.Flush(); 
-                        } 
-                }
-
-                protected short GetNextCommandId() {
-                        lock (transmissionLock) {
-                                return++nextCommandId; 
-                        } 
-                }
-
-                protected Socket Connect(string host, int port) {
-                        // Looping through the AddressList allows different type of connections to be tried 
-                        // (IPv4, IPv6 and whatever else may be available).
-                        IPHostEntry hostEntry = Dns.Resolve(host);
-                        foreach (IPAddress address in hostEntry.AddressList) {
-                                Socket socket = new Socket(
-                                        address.AddressFamily,
-                                        SocketType.Stream,
-                                        ProtocolType.Tcp);
-                                socket.Connect(new IPEndPoint(address, port));
-                                if (socket.Connected) {
-                                        return socket; 
-                                } 
+                catch (EndOfStreamException e)
+                {
+                    // stream closed
+                    break;
+                }
+                catch (ObjectDisposedException e)
+                {
+                    // stream closed
+                    break;
+                }
+                if (command is Response)
+                {
+                    Console.WriteLine("Received response!: " + command);
+                    Response response = (Response) command;
+                    FutureResponse future = (FutureResponse) requestMap[response.CommandId];
+                    if (future != null)
+                    {
+                        if (response is ExceptionResponse)
+                        {
+                            ExceptionResponse er = (ExceptionResponse) response;
+                            Exception e = new BrokerException(er.Exception);
+                            if (this.Exception != null)
+                            {
+                                this.Exception(this, e);
+                            }
+                            else
+                            {
+                                throw e;
+                            }
                         }
-                        throw new SocketException(); 
-                } 
-        } 
+                        else
+                        {
+                            future.Response = response;
+                        }
+                    }
+                    else
+                    {
+                        Console.WriteLine("Unknown response ID: " + response.CommandId);
+                    }
+                }
+                else
+                {
+                    if (this.Command != null)
+                    {
+                        this.Command(this, command);
+                    }
+                    else
+                    {
+                        Console.WriteLine("No handler available to process command: " + command);
+                    }
+                }
+            }
+        }
+        
+        
+        // Implementation methods
+        
+        protected void Send(Command command)
+        {
+            lock (transmissionLock)
+            {
+                Console.WriteLine("Sending command: " + command  + " with ID: " + command.CommandId + " response: " + command.ResponseRequired);
+                
+                wireformat.Marshal(command, socketWriter);
+                socketWriter.Flush();
+            }
+        }
+        
+        protected short GetNextCommandId()
+        {
+            lock (transmissionLock)
+            {
+                return++nextCommandId;
+            }
+        }
+        
+        protected Socket Connect(string host, int port)
+        {
+            // Looping through the AddressList allows different type of connections to be tried
+            // (IPv4, IPv6 and whatever else may be available).
+            IPHostEntry hostEntry = Dns.Resolve(host);
+            foreach (IPAddress address in hostEntry.AddressList)
+            {
+                Socket socket = new Socket(
+                    address.AddressFamily,
+                    SocketType.Stream,
+                    ProtocolType.Tcp);
+                socket.Connect(new IPEndPoint(address, port));
+                if (socket.Connected)
+                {
+                    return socket;
+                }
+            }
+            throw new SocketException();
+        }
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs Thu Feb 23 10:16:15 2006
@@ -1,21 +1,30 @@
 using System;
 using OpenWire.Client.Commands;
 
-namespace OpenWire.Client {
+namespace OpenWire.Client
+{
+    public delegate void MessageHandler(IMessage message);
+    
+    /// <summary>
+    /// A consumer of messages
+    /// </summary>
+    public interface IMessageConsumer : IDisposable
+    {
+        
         /// <summary>
-        /// A consumer of messages
+        /// Waits until a message is available and returns it
         /// </summary>
-        public interface IMessageConsumer : IDisposable {
-
-                /// <summary>
-                /// Waits until a message is available and returns it
-                /// </summary>
-                IMessage Receive();
-                
-                /// <summary>
-                /// If a message is available immediately it is returned otherwise this method returns null
-                /// </summary>
-                IMessage ReceiveNoWait();
-                
-        } 
+        IMessage Receive();
+        
+        /// <summary>
+        /// If a message is available immediately it is returned otherwise this method returns null
+        /// </summary>
+        IMessage ReceiveNoWait();
+        
+        /// <summary>
+        /// An asynchronous listener which can be used to consume messages asynchronously
+        /// </summary>
+        event MessageHandler Listener;
+        
+    }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -45,7 +45,7 @@
         base.Unmarshal(wireFormat, o, dataIn, bs);
 
         BaseCommand info = (BaseCommand)o;
-        info.CommandId = dataIn.ReadInt16();
+        info.CommandId = DataStreamMarshaller.ReadShort(dataIn);
         info.ResponseRequired = bs.ReadBoolean();
 
     }
@@ -70,7 +70,7 @@
         base.Marshal2(wireFormat, o, dataOut, bs);
 
         BaseCommand info = (BaseCommand)o;
-        dataOut.Write((short)info.CommandId);
+        DataStreamMarshaller.WriteShort(info.CommandId, dataOut);
         bs.ReadBoolean();
 
     }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -60,7 +60,7 @@
         info.BrokerURL = ReadString(dataIn, bs);
 
         if (bs.ReadBoolean()) {
-            short size = dataIn.ReadInt16();
+            short size = DataStreamMarshaller.ReadShort(dataIn);
             BrokerInfo[] value = new BrokerInfo[size];
             for( int i=0; i < size; i++ ) {
                 value[i] = (BrokerInfo) UnmarshalNestedObject(wireFormat,dataIn, bs);

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -62,7 +62,7 @@
         info.UserName = ReadString(dataIn, bs);
 
         if (bs.ReadBoolean()) {
-            short size = dataIn.ReadInt16();
+            short size = DataStreamMarshaller.ReadShort(dataIn);
             BrokerId[] value = new BrokerId[size];
             for( int i=0; i < size; i++ ) {
                 value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -59,17 +59,17 @@
         info.ConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs);
         info.Browser = bs.ReadBoolean();
         info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
-        info.PrefetchSize = dataIn.ReadInt32();
+        info.PrefetchSize = DataStreamMarshaller.ReadInt(dataIn);
         info.DispatchAsync = bs.ReadBoolean();
         info.Selector = ReadString(dataIn, bs);
         info.SubcriptionName = ReadString(dataIn, bs);
         info.NoLocal = bs.ReadBoolean();
         info.Exclusive = bs.ReadBoolean();
         info.Retroactive = bs.ReadBoolean();
-        info.Priority = dataIn.ReadByte();
+        info.Priority = DataStreamMarshaller.ReadByte(dataIn);
 
         if (bs.ReadBoolean()) {
-            short size = dataIn.ReadInt16();
+            short size = DataStreamMarshaller.ReadShort(dataIn);
             BrokerId[] value = new BrokerId[size];
             for( int i=0; i < size; i++ ) {
                 value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);
@@ -116,14 +116,14 @@
         Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs);
         bs.ReadBoolean();
         Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs);
-        dataOut.Write((int) info.PrefetchSize);
+        DataStreamMarshaller.WriteInt(info.PrefetchSize, dataOut);
         bs.ReadBoolean();
         WriteString(info.Selector, dataOut, bs);
         WriteString(info.SubcriptionName, dataOut, bs);
         bs.ReadBoolean();
         bs.ReadBoolean();
         bs.ReadBoolean();
-        dataOut.Write((byte) info.Priority);
+        DataStreamMarshaller.WriteByte(info.Priority, dataOut);
         MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs);
         bs.ReadBoolean();
 

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -58,7 +58,7 @@
         DataArrayResponse info = (DataArrayResponse)o;
 
         if (bs.ReadBoolean()) {
-            short size = dataIn.ReadInt16();
+            short size = DataStreamMarshaller.ReadShort(dataIn);
             DataStructure[] value = new DataStructure[size];
             for( int i=0; i < size; i++ ) {
                 value[i] = (DataStructure) UnmarshalNestedObject(wireFormat,dataIn, bs);

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -58,11 +58,11 @@
         DestinationInfo info = (DestinationInfo)o;
         info.ConnectionId = (ConnectionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
         info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
-        info.OperationType = dataIn.ReadByte();
+        info.OperationType = DataStreamMarshaller.ReadByte(dataIn);
         info.Timeout = UnmarshalLong(wireFormat, dataIn, bs);
 
         if (bs.ReadBoolean()) {
-            short size = dataIn.ReadInt16();
+            short size = DataStreamMarshaller.ReadShort(dataIn);
             BrokerId[] value = new BrokerId[size];
             for( int i=0; i < size; i++ ) {
                 value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);
@@ -100,7 +100,7 @@
         DestinationInfo info = (DestinationInfo)o;
         Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs);
         Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs);
-        dataOut.Write((byte) info.OperationType);
+        DataStreamMarshaller.WriteByte(info.OperationType, dataOut);
         Marshal2Long(wireFormat, info.Timeout, dataOut, bs);
         MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs);
 

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -56,7 +56,7 @@
         base.Unmarshal(wireFormat, o, dataIn, bs);
 
         IntegerResponse info = (IntegerResponse)o;
-        info.Result = dataIn.ReadInt32();
+        info.Result = DataStreamMarshaller.ReadInt(dataIn);
 
     }
 
@@ -79,7 +79,7 @@
         base.Marshal2(wireFormat, o, dataOut, bs);
 
         IntegerResponse info = (IntegerResponse)o;
-        dataOut.Write((int) info.Result);
+        DataStreamMarshaller.WriteInt(info.Result, dataOut);
 
     }
   }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -57,7 +57,7 @@
 
         JournalTransaction info = (JournalTransaction)o;
         info.TransactionId = (TransactionId) UnmarshalNestedObject(wireFormat, dataIn, bs);
-        info.Type = dataIn.ReadByte();
+        info.Type = DataStreamMarshaller.ReadByte(dataIn);
         info.WasPrepared = bs.ReadBoolean();
 
     }
@@ -84,7 +84,7 @@
 
         JournalTransaction info = (JournalTransaction)o;
         Marshal2NestedObject(wireFormat, info.TransactionId, dataOut, bs);
-        dataOut.Write((byte) info.Type);
+        DataStreamMarshaller.WriteByte(info.Type, dataOut);
         bs.ReadBoolean();
 
     }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -59,10 +59,10 @@
         info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
         info.TransactionId = (TransactionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
         info.ConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs);
-        info.AckType = dataIn.ReadByte();
+        info.AckType = DataStreamMarshaller.ReadByte(dataIn);
         info.FirstMessageId = (MessageId) UnmarshalNestedObject(wireFormat, dataIn, bs);
         info.LastMessageId = (MessageId) UnmarshalNestedObject(wireFormat, dataIn, bs);
-        info.MessageCount = dataIn.ReadInt32();
+        info.MessageCount = DataStreamMarshaller.ReadInt(dataIn);
 
     }
 
@@ -93,10 +93,10 @@
         Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs);
         Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs);
         Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs);
-        dataOut.Write((byte) info.AckType);
+        DataStreamMarshaller.WriteByte(info.AckType, dataOut);
         Marshal2NestedObject(wireFormat, info.FirstMessageId, dataOut, bs);
         Marshal2NestedObject(wireFormat, info.LastMessageId, dataOut, bs);
-        dataOut.Write((int) info.MessageCount);
+        DataStreamMarshaller.WriteInt(info.MessageCount, dataOut);
 
     }
   }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -59,7 +59,7 @@
         info.ConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs);
         info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
         info.Message = (Message) UnmarshalNestedObject(wireFormat, dataIn, bs);
-        info.RedeliveryCounter = dataIn.ReadInt32();
+        info.RedeliveryCounter = DataStreamMarshaller.ReadInt(dataIn);
 
     }
 
@@ -88,7 +88,7 @@
         Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs);
         Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs);
         Marshal2NestedObject(wireFormat, info.Message, dataOut, bs);
-        dataOut.Write((int) info.RedeliveryCounter);
+        DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut);
 
     }
   }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -52,11 +52,11 @@
         info.MessageId = (MessageId) UnmarshalNestedObject(wireFormat, dataIn, bs);
         info.OriginalTransactionId = (TransactionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
         info.GroupID = ReadString(dataIn, bs);
-        info.GroupSequence = dataIn.ReadInt32();
+        info.GroupSequence = DataStreamMarshaller.ReadInt(dataIn);
         info.CorrelationId = ReadString(dataIn, bs);
         info.Persistent = bs.ReadBoolean();
         info.Expiration = UnmarshalLong(wireFormat, dataIn, bs);
-        info.Priority = dataIn.ReadByte();
+        info.Priority = DataStreamMarshaller.ReadByte(dataIn);
         info.ReplyTo = (ActiveMQDestination) UnmarshalNestedObject(wireFormat, dataIn, bs);
         info.Timestamp = UnmarshalLong(wireFormat, dataIn, bs);
         info.Type = ReadString(dataIn, bs);
@@ -65,10 +65,10 @@
         info.DataStructure = (DataStructure) UnmarshalNestedObject(wireFormat, dataIn, bs);
         info.TargetConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs);
         info.Compressed = bs.ReadBoolean();
-        info.RedeliveryCounter = dataIn.ReadInt32();
+        info.RedeliveryCounter = DataStreamMarshaller.ReadInt(dataIn);
 
         if (bs.ReadBoolean()) {
-            short size = dataIn.ReadInt16();
+            short size = DataStreamMarshaller.ReadShort(dataIn);
             BrokerId[] value = new BrokerId[size];
             for( int i=0; i < size; i++ ) {
                 value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);
@@ -134,26 +134,26 @@
         Marshal2NestedObject(wireFormat, info.MessageId, dataOut, bs);
         Marshal2CachedObject(wireFormat, info.OriginalTransactionId, dataOut, bs);
         WriteString(info.GroupID, dataOut, bs);
-        dataOut.Write((int) info.GroupSequence);
+        DataStreamMarshaller.WriteInt(info.GroupSequence, dataOut);
         WriteString(info.CorrelationId, dataOut, bs);
         bs.ReadBoolean();
         Marshal2Long(wireFormat, info.Expiration, dataOut, bs);
-        dataOut.Write((byte) info.Priority);
+        DataStreamMarshaller.WriteByte(info.Priority, dataOut);
         Marshal2NestedObject(wireFormat, info.ReplyTo, dataOut, bs);
         Marshal2Long(wireFormat, info.Timestamp, dataOut, bs);
         WriteString(info.Type, dataOut, bs);
         if(bs.ReadBoolean()) {
-           dataOut.Write((int)info.Content.Length);
+           DataStreamMarshaller.WriteInt(info.Content.Length, dataOut);
            dataOut.Write(info.Content);
         }
         if(bs.ReadBoolean()) {
-           dataOut.Write((int)info.MarshalledProperties.Length);
+           DataStreamMarshaller.WriteInt(info.MarshalledProperties.Length, dataOut);
            dataOut.Write(info.MarshalledProperties);
         }
         Marshal2NestedObject(wireFormat, info.DataStructure, dataOut, bs);
         Marshal2CachedObject(wireFormat, info.TargetConsumerId, dataOut, bs);
         bs.ReadBoolean();
-        dataOut.Write((int) info.RedeliveryCounter);
+        DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut);
         MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs);
         Marshal2Long(wireFormat, info.Arrival, dataOut, bs);
         WriteString(info.UserID, dataOut, bs);

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -60,7 +60,7 @@
         info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
 
         if (bs.ReadBoolean()) {
-            short size = dataIn.ReadInt16();
+            short size = DataStreamMarshaller.ReadShort(dataIn);
             BrokerId[] value = new BrokerId[size];
             for( int i=0; i < size; i++ ) {
                 value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -56,7 +56,7 @@
         base.Unmarshal(wireFormat, o, dataIn, bs);
 
         Response info = (Response)o;
-        info.CorrelationId = dataIn.ReadInt16();
+        info.CorrelationId = DataStreamMarshaller.ReadShort(dataIn);
 
     }
 
@@ -79,7 +79,7 @@
         base.Marshal2(wireFormat, o, dataOut, bs);
 
         Response info = (Response)o;
-        dataOut.Write((short)info.CorrelationId);
+        DataStreamMarshaller.WriteShort(info.CorrelationId, dataOut);
 
     }
   }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs?rev=380183&r1=380182&r2=380183&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs Thu Feb 23 10:16:15 2006
@@ -58,7 +58,7 @@
         TransactionInfo info = (TransactionInfo)o;
         info.ConnectionId = (ConnectionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
         info.TransactionId = (TransactionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
-        info.Type = dataIn.ReadByte();
+        info.Type = DataStreamMarshaller.ReadByte(dataIn);
 
     }
 
@@ -85,7 +85,7 @@
         TransactionInfo info = (TransactionInfo)o;
         Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs);
         Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs);
-        dataOut.Write((byte) info.Type);
+        DataStreamMarshaller.WriteByte(info.Type, dataOut);
 
     }
   }



Mime
View raw message