qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ai...@apache.org
Subject svn commit: r886940 [6/14] - in /qpid/trunk/qpid/dotnet/client-010: ./ addins/ addins/ExcelAddIn/ addins/ExcelAddInMessageProcessor/ addins/ExcelAddInProducer/ client/ client/client/ client/transport/ client/transport/codec/ client/transport/exception/...
Date Thu, 03 Dec 2009 22:03:55 GMT
Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/network/Assembler.cs Thu Dec  3 22:03:51 2009
@@ -1,282 +1,282 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using org.apache.qpid.transport.codec;
-using org.apache.qpid.transport.util;
-
-namespace org.apache.qpid.transport.network
-{
-    /// <summary> 
-    /// Assembler
-    /// </summary>
-    public delegate void Processor(NetworkDelegate ndelegate);
-
-    public class Assembler : NetworkDelegate, Receiver<ReceivedPayload<ProtocolEvent>>
-    {
-        private static readonly Logger log = Logger.get(typeof (Assembler));
-        private readonly Dictionary<int, List<byte[]>> segments;
-        private readonly Method[] incomplete;
-        [ThreadStatic] static MSDecoder _decoder;
-        private readonly Object m_objectLock = new object();
-
-        // the event raised when a buffer is read from the wire        
-        public event EventHandler<ReceivedPayload<ProtocolEvent>> ReceivedEvent;
-        public event EventHandler<ExceptionArgs> ExceptionProcessing;
-        public event EventHandler HandlerClosed;
-
-        event EventHandler<ReceivedPayload<ProtocolEvent>> Receiver<ReceivedPayload<ProtocolEvent>>.Received
-        {
-            add
-            {
-                lock (m_objectLock)
-                {
-                    ReceivedEvent += value;
-                }
-            }
-            remove
-            {
-                lock (m_objectLock)
-                {
-                    ReceivedEvent -= value;
-                }
-            }
-        }
-
-        event EventHandler<ExceptionArgs> Receiver<ReceivedPayload<ProtocolEvent>>.Exception
-        {
-            add
-            {
-                lock (m_objectLock)
-                {
-                    ExceptionProcessing += value;
-                }
-            }
-            remove
-            {
-                lock (m_objectLock)
-                {
-                    ExceptionProcessing -= value;
-                }
-            }
-        }
-
-        event EventHandler Receiver<ReceivedPayload<ProtocolEvent>>.Closed
-        {
-            add
-            {
-                lock (m_objectLock)
-                {
-                    HandlerClosed += value;
-                }
-            }
-            remove
-            {
-                lock (m_objectLock)
-                {
-                    HandlerClosed -= value;
-                }
-            }
-        }
-
-        public Assembler()
-        {
-            segments = new Dictionary<int, List<byte[]>>();
-            incomplete = new Method[64*1024];
-        }
-
-        // Invoked when a network event is received
-        public void On_ReceivedEvent(object sender, ReceivedPayload<NetworkEvent> payload)
-        {
-            payload.Payload.ProcessNetworkEvent(this);
-        }
-
-        #region Interface NetworkDelegate
-
-        public void Init(ProtocolHeader header)
-        {
-            Emit(0, header);
-        }
-
-        public void Error(ProtocolError error)
-        {
-            Emit(0, error);
-        }
-
-        public void Frame(Frame frame)
-        {
-            MemoryStream segment;
-            if (frame.isFirstFrame() && frame.isLastFrame())
-            {                
-                byte[] tmp = new byte[frame.BodySize];
-                frame.Body.Read(tmp, 0, tmp.Length);
-                segment = new MemoryStream();
-                BinaryWriter w = new BinaryWriter(segment);
-                w.Write(tmp);
-                assemble(frame, new MemoryStream(tmp));
-            }
-            else
-            {
-                List<byte[]> frames;
-                if (frame.isFirstFrame())
-                {
-                    frames = new List<byte[]>();
-                    setSegment(frame, frames);
-                }
-                else
-                {
-                    frames = getSegment(frame);
-                }
-                byte[] tmp = new byte[frame.BodySize];
-                frame.Body.Read(tmp, 0, tmp.Length);
-                frames.Add(tmp);
-
-                if (frame.isLastFrame())
-                {
-                    clearSegment(frame);
-                    segment = new MemoryStream();
-                    BinaryWriter w = new BinaryWriter(segment);
-                    foreach (byte[] f in frames)
-                    {
-                        w.Write(f);
-                    }
-                    assemble(frame, segment);
-                }
-            }
-        }
-
-        #endregion
-
-        #region Private Support Functions
-
-
-        private MSDecoder getDecoder()
-        {
-            if( _decoder == null )
-            {
-                _decoder = new MSDecoder();
-            }
-            return _decoder;
-        }
-
-        private void assemble(Frame frame, MemoryStream segment)
-        {
-            MSDecoder decoder = getDecoder();
-            decoder.init(segment);
-            int channel = frame.Channel;
-            Method command;
-            switch (frame.Type)
-            {
-                case SegmentType.CONTROL:
-                    int controlType = decoder.readUint16();                    
-                    Method control = Method.create(controlType);
-                    control.read(decoder);
-                    Emit(channel, control);
-                    break;
-                case SegmentType.COMMAND:
-                    int commandType = decoder.readUint16();
-                     // read in the session header, right now we don't use it
-                    decoder.readUint16();
-                    command = Method.create(commandType);
-                    command.read(decoder);
-                    if (command.hasPayload())
-                    {
-                        incomplete[channel] = command;
-                    }
-                    else
-                    {
-                        Emit(channel, command);
-                    }
-                    break;
-                case SegmentType.HEADER:
-                    command = incomplete[channel];
-                    List<Struct> structs = new List<Struct>();
-                    while (decoder.hasRemaining())                    
-                    {
-                        structs.Add(decoder.readStruct32());
-                    }
-                    command.Header = new Header(structs);
-                    if (frame.isLastSegment())
-                    {
-                        incomplete[channel] = null;
-                        Emit(channel, command);
-                    }
-                    break;
-                case SegmentType.BODY:
-                    command = incomplete[channel];                  
-                    segment.Seek(0, SeekOrigin.Begin);
-                    command.Body = segment;
-                    incomplete[channel] = null;
-                    Emit(channel, command);
-                    break;
-                default:
-                    throw new Exception("unknown frame type: " + frame.Type);
-            }
-        }
-
-        private int segmentKey(Frame frame)
-        {
-            return (frame.Track + 1)*frame.Channel;
-        }
-
-        private List<byte[]> getSegment(Frame frame)
-        {
-            return segments[segmentKey(frame)];
-        }
-
-        private void setSegment(Frame frame, List<byte[]> segment)
-        {
-            int key = segmentKey(frame);
-            if (segments.ContainsKey(key))
-            {
-                Error(new ProtocolError(network.Frame.L2, "segment in progress: %s",
-                                        frame));
-            }
-            segments.Add(segmentKey(frame), segment);            
-        }
-
-        private void clearSegment(Frame frame)
-        {
-            segments.Remove(segmentKey(frame));
-        }
-
-        // Emit a protocol event 
-        private void Emit(int channel, ProtocolEvent protevent)
-        {
-            protevent.Channel = channel;
-            log.debug("Assembler: protocol event:", protevent);
-            ReceivedPayload<ProtocolEvent> payload = new ReceivedPayload<ProtocolEvent>();
-            payload.Payload = protevent;
-            if (ReceivedEvent != null)
-            {
-                ReceivedEvent(this, payload);
-            }
-            else
-            {
-                log.debug("No listener for event: {0}", protevent);
-            }
-        }
-
-        #endregion
-    }
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using org.apache.qpid.transport.codec;
+using org.apache.qpid.transport.util;
+
+namespace org.apache.qpid.transport.network
+{
+    /// <summary> 
+    /// Assembler
+    /// </summary>
+    public delegate void Processor(NetworkDelegate ndelegate);
+
+    public class Assembler : NetworkDelegate, Receiver<ReceivedPayload<ProtocolEvent>>
+    {
+        private static readonly Logger log = Logger.get(typeof (Assembler));
+        private readonly Dictionary<int, List<byte[]>> segments;
+        private readonly Method[] incomplete;
+        [ThreadStatic] static MSDecoder _decoder;
+        private readonly Object m_objectLock = new object();
+
+        // the event raised when a buffer is read from the wire        
+        public event EventHandler<ReceivedPayload<ProtocolEvent>> ReceivedEvent;
+        public event EventHandler<ExceptionArgs> ExceptionProcessing;
+        public event EventHandler HandlerClosed;
+
+        event EventHandler<ReceivedPayload<ProtocolEvent>> Receiver<ReceivedPayload<ProtocolEvent>>.Received
+        {
+            add
+            {
+                lock (m_objectLock)
+                {
+                    ReceivedEvent += value;
+                }
+            }
+            remove
+            {
+                lock (m_objectLock)
+                {
+                    ReceivedEvent -= value;
+                }
+            }
+        }
+
+        event EventHandler<ExceptionArgs> Receiver<ReceivedPayload<ProtocolEvent>>.Exception
+        {
+            add
+            {
+                lock (m_objectLock)
+                {
+                    ExceptionProcessing += value;
+                }
+            }
+            remove
+            {
+                lock (m_objectLock)
+                {
+                    ExceptionProcessing -= value;
+                }
+            }
+        }
+
+        event EventHandler Receiver<ReceivedPayload<ProtocolEvent>>.Closed
+        {
+            add
+            {
+                lock (m_objectLock)
+                {
+                    HandlerClosed += value;
+                }
+            }
+            remove
+            {
+                lock (m_objectLock)
+                {
+                    HandlerClosed -= value;
+                }
+            }
+        }
+
+        public Assembler()
+        {
+            segments = new Dictionary<int, List<byte[]>>();
+            incomplete = new Method[64*1024];
+        }
+
+        // Invoked when a network event is received
+        public void On_ReceivedEvent(object sender, ReceivedPayload<NetworkEvent> payload)
+        {
+            payload.Payload.ProcessNetworkEvent(this);
+        }
+
+        #region Interface NetworkDelegate
+
+        public void Init(ProtocolHeader header)
+        {
+            Emit(0, header);
+        }
+
+        public void Error(ProtocolError error)
+        {
+            Emit(0, error);
+        }
+
+        public void Frame(Frame frame)
+        {
+            MemoryStream segment;
+            if (frame.isFirstFrame() && frame.isLastFrame())
+            {                
+                byte[] tmp = new byte[frame.BodySize];
+                frame.Body.Read(tmp, 0, tmp.Length);
+                segment = new MemoryStream();
+                BinaryWriter w = new BinaryWriter(segment);
+                w.Write(tmp);
+                assemble(frame, new MemoryStream(tmp));
+            }
+            else
+            {
+                List<byte[]> frames;
+                if (frame.isFirstFrame())
+                {
+                    frames = new List<byte[]>();
+                    setSegment(frame, frames);
+                }
+                else
+                {
+                    frames = getSegment(frame);
+                }
+                byte[] tmp = new byte[frame.BodySize];
+                frame.Body.Read(tmp, 0, tmp.Length);
+                frames.Add(tmp);
+
+                if (frame.isLastFrame())
+                {
+                    clearSegment(frame);
+                    segment = new MemoryStream();
+                    BinaryWriter w = new BinaryWriter(segment);
+                    foreach (byte[] f in frames)
+                    {
+                        w.Write(f);
+                    }
+                    assemble(frame, segment);
+                }
+            }
+        }
+
+        #endregion
+
+        #region Private Support Functions
+
+
+        private MSDecoder getDecoder()
+        {
+            if( _decoder == null )
+            {
+                _decoder = new MSDecoder();
+            }
+            return _decoder;
+        }
+
+        private void assemble(Frame frame, MemoryStream segment)
+        {
+            MSDecoder decoder = getDecoder();
+            decoder.init(segment);
+            int channel = frame.Channel;
+            Method command;
+            switch (frame.Type)
+            {
+                case SegmentType.CONTROL:
+                    int controlType = decoder.readUint16();                    
+                    Method control = Method.create(controlType);
+                    control.read(decoder);
+                    Emit(channel, control);
+                    break;
+                case SegmentType.COMMAND:
+                    int commandType = decoder.readUint16();
+                     // read in the session header, right now we don't use it
+                    decoder.readUint16();
+                    command = Method.create(commandType);
+                    command.read(decoder);
+                    if (command.hasPayload())
+                    {
+                        incomplete[channel] = command;
+                    }
+                    else
+                    {
+                        Emit(channel, command);
+                    }
+                    break;
+                case SegmentType.HEADER:
+                    command = incomplete[channel];
+                    List<Struct> structs = new List<Struct>();
+                    while (decoder.hasRemaining())                    
+                    {
+                        structs.Add(decoder.readStruct32());
+                    }
+                    command.Header = new Header(structs);
+                    if (frame.isLastSegment())
+                    {
+                        incomplete[channel] = null;
+                        Emit(channel, command);
+                    }
+                    break;
+                case SegmentType.BODY:
+                    command = incomplete[channel];                  
+                    segment.Seek(0, SeekOrigin.Begin);
+                    command.Body = segment;
+                    incomplete[channel] = null;
+                    Emit(channel, command);
+                    break;
+                default:
+                    throw new Exception("unknown frame type: " + frame.Type);
+            }
+        }
+
+        private int segmentKey(Frame frame)
+        {
+            return (frame.Track + 1)*frame.Channel;
+        }
+
+        private List<byte[]> getSegment(Frame frame)
+        {
+            return segments[segmentKey(frame)];
+        }
+
+        private void setSegment(Frame frame, List<byte[]> segment)
+        {
+            int key = segmentKey(frame);
+            if (segments.ContainsKey(key))
+            {
+                Error(new ProtocolError(network.Frame.L2, "segment in progress: %s",
+                                        frame));
+            }
+            segments.Add(segmentKey(frame), segment);            
+        }
+
+        private void clearSegment(Frame frame)
+        {
+            segments.Remove(segmentKey(frame));
+        }
+
+        // Emit a protocol event 
+        private void Emit(int channel, ProtocolEvent protevent)
+        {
+            protevent.Channel = channel;
+            log.debug("Assembler: protocol event:", protevent);
+            ReceivedPayload<ProtocolEvent> payload = new ReceivedPayload<ProtocolEvent>();
+            payload.Payload = protevent;
+            if (ReceivedEvent != null)
+            {
+                ReceivedEvent(this, payload);
+            }
+            else
+            {
+                log.debug("No listener for event: {0}", protevent);
+            }
+        }
+
+        #endregion
+    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/network/Disassembler.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Disassembler.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/network/Disassembler.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/network/Disassembler.cs Thu Dec  3 22:03:51 2009
@@ -1,222 +1,222 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-using System;
-using System.IO;
-using org.apache.qpid.transport.codec;
-using org.apache.qpid.transport.util;
-
-namespace org.apache.qpid.transport.network
-{
-    /// <summary> 
-    /// Disassembler
-    /// </summary>
-    public sealed class Disassembler : Sender<ProtocolEvent>, ProtocolDelegate<Object>
-    {
-        private readonly IIOSender<MemoryStream> _sender;
-        private readonly int _maxPayload;
-        private readonly MemoryStream _header;
-        private readonly BinaryWriter _writer;
-        private readonly Object _sendlock = new Object();
-        [ThreadStatic] static MSEncoder _encoder;
-
-
-        public Disassembler(IIOSender<MemoryStream> sender, int maxFrame)
-        {
-            if (maxFrame <= Frame.HEADER_SIZE || maxFrame >= 64*1024)
-            {
-                throw new Exception(String.Format("maxFrame must be > {0} and < 64K: ", Frame.HEADER_SIZE) + maxFrame);
-            }
-            _sender = sender;
-            _maxPayload = maxFrame - Frame.HEADER_SIZE;
-            _header = new MemoryStream(Frame.HEADER_SIZE);
-            _writer = new BinaryWriter(_header);
-        }
-
-        #region Sender Interface 
-
-        public void send(ProtocolEvent pevent)
-        {
-            pevent.ProcessProtocolEvent(null, this);
-        }
-
-        public void flush()
-        {
-            lock (_sendlock)
-            {
-                _sender.flush();
-            }
-        }
-
-        public void close()
-        {
-            lock (_sendlock)
-            {
-                _sender.close();
-            }
-        }
-
-        #endregion
-
-        #region ProtocolDelegate<Object> Interface 
-
-        public void Init(Object v, ProtocolHeader header)
-        {
-            lock (_sendlock)
-            {
-                _sender.send(header.ToMemoryStream());
-                _sender.flush();
-            }
-        }
-
-        public void Control(Object v, Method method)
-        {
-            invokeMethod(method, SegmentType.CONTROL);
-        }
-
-        public void Command(Object v, Method method)
-        {
-            invokeMethod(method, SegmentType.COMMAND);
-        }
-
-        public void Error(Object v, ProtocolError error)
-        {
-            throw new Exception("Error: " + error);
-        }
-
-        #endregion
-
-        #region private 
-
-        private void frame(byte flags, byte type, byte track, int channel, int size, MemoryStream buf)
-        {
-            lock (_sendlock)
-            {
-                 _writer.Write(flags);
-                _writer.Write(type);
-                _writer.Write(ByteEncoder.GetBigEndian((UInt16)(size + Frame.HEADER_SIZE)));
-                _writer.Write((byte)0);
-                _writer.Write(track);
-                _writer.Write(ByteEncoder.GetBigEndian((UInt16)( channel)));               
-                _writer.Write((byte)0);
-                _writer.Write((byte)0);
-                _writer.Write((byte)0);
-               _writer.Write((byte)0);
-                _sender.send(_header);
-                _header.Seek(0, SeekOrigin.Begin);               
-                _sender.send(buf, size);
-            }
-        }
-
-        private void fragment(byte flags, SegmentType type, ProtocolEvent mevent, MemoryStream buf)
-        {
-            byte typeb = (byte) type;
-            byte track = mevent.EncodedTrack == Frame.L4 ? (byte) 1 : (byte) 0;
-            int remaining = (int) buf.Length;
-            buf.Seek(0, SeekOrigin.Begin);
-            bool first = true;
-            while (true)
-            {
-                int size = Math.Min(_maxPayload, remaining);
-                remaining -= size;              
-
-                byte newflags = flags;
-                if (first)
-                {
-                    newflags |= Frame.FIRST_FRAME;
-                    first = false;
-                }
-                if (remaining == 0)
-                {
-                    newflags |= Frame.LAST_FRAME;
-                }                
-
-                frame(newflags, typeb, track, mevent.Channel, size, buf);
-
-                if (remaining == 0)
-                {
-                    break;
-                }
-            }
-        }
-
-        private MSEncoder getEncoder()
-        {
-            if( _encoder == null)
-            {
-                _encoder = new MSEncoder(4 * 1024);
-            }
-            return _encoder;
-        }
-
-        private void invokeMethod(Method method, SegmentType type)
-        {
-            MSEncoder encoder = getEncoder();
-            encoder.init();
-            encoder.writeUint16(method.getEncodedType());
-            if (type == SegmentType.COMMAND)
-            {
-                if (method.Sync)
-                {
-                    encoder.writeUint16(0x0101);
-                }
-                else
-                {
-                    encoder.writeUint16(0x0100);
-                }
-            }
-            method.write(_encoder);
-            MemoryStream methodSeg = encoder.segment();
-
-            byte flags = Frame.FIRST_SEG;
-
-            bool payload = method.hasPayload();
-            if (!payload)
-            {
-                flags |= Frame.LAST_SEG;
-            }
-
-            MemoryStream headerSeg = null;
-            if (payload)
-            {
-                Header hdr = method.Header;
-                Struct[] structs = hdr.Structs;
-
-                foreach (Struct st in structs)
-                {
-                    encoder.writeStruct32(st);
-                }
-                headerSeg = encoder.segment();
-            }
-
-            lock (_sendlock)
-            {
-                fragment(flags, type, method, methodSeg);
-                if (payload)
-                {
-                    fragment( 0x0, SegmentType.HEADER, method, headerSeg);
-                    fragment(Frame.LAST_SEG, SegmentType.BODY, method, method.Body);
-                }
-            }
-        }
-
-        #endregion
-    }
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.IO;
+using org.apache.qpid.transport.codec;
+using org.apache.qpid.transport.util;
+
+namespace org.apache.qpid.transport.network
+{
+    /// <summary> 
+    /// Disassembler
+    /// </summary>
+    public sealed class Disassembler : Sender<ProtocolEvent>, ProtocolDelegate<Object>
+    {
+        private readonly IIOSender<MemoryStream> _sender;
+        private readonly int _maxPayload;
+        private readonly MemoryStream _header;
+        private readonly BinaryWriter _writer;
+        private readonly Object _sendlock = new Object();
+        [ThreadStatic] static MSEncoder _encoder;
+
+
+        public Disassembler(IIOSender<MemoryStream> sender, int maxFrame)
+        {
+            if (maxFrame <= Frame.HEADER_SIZE || maxFrame >= 64*1024)
+            {
+                throw new Exception(String.Format("maxFrame must be > {0} and < 64K: ", Frame.HEADER_SIZE) + maxFrame);
+            }
+            _sender = sender;
+            _maxPayload = maxFrame - Frame.HEADER_SIZE;
+            _header = new MemoryStream(Frame.HEADER_SIZE);
+            _writer = new BinaryWriter(_header);
+        }
+
+        #region Sender Interface 
+
+        public void send(ProtocolEvent pevent)
+        {
+            pevent.ProcessProtocolEvent(null, this);
+        }
+
+        public void flush()
+        {
+            lock (_sendlock)
+            {
+                _sender.flush();
+            }
+        }
+
+        public void close()
+        {
+            lock (_sendlock)
+            {
+                _sender.close();
+            }
+        }
+
+        #endregion
+
+        #region ProtocolDelegate<Object> Interface 
+
+        public void Init(Object v, ProtocolHeader header)
+        {
+            lock (_sendlock)
+            {
+                _sender.send(header.ToMemoryStream());
+                _sender.flush();
+            }
+        }
+
+        public void Control(Object v, Method method)
+        {
+            invokeMethod(method, SegmentType.CONTROL);
+        }
+
+        public void Command(Object v, Method method)
+        {
+            invokeMethod(method, SegmentType.COMMAND);
+        }
+
+        public void Error(Object v, ProtocolError error)
+        {
+            throw new Exception("Error: " + error);
+        }
+
+        #endregion
+
+        #region private 
+
+        private void frame(byte flags, byte type, byte track, int channel, int size, MemoryStream buf)
+        {
+            lock (_sendlock)
+            {
+                 _writer.Write(flags);
+                _writer.Write(type);
+                _writer.Write(ByteEncoder.GetBigEndian((UInt16)(size + Frame.HEADER_SIZE)));
+                _writer.Write((byte)0);
+                _writer.Write(track);
+                _writer.Write(ByteEncoder.GetBigEndian((UInt16)( channel)));               
+                _writer.Write((byte)0);
+                _writer.Write((byte)0);
+                _writer.Write((byte)0);
+               _writer.Write((byte)0);
+                _sender.send(_header);
+                _header.Seek(0, SeekOrigin.Begin);               
+                _sender.send(buf, size);
+            }
+        }
+
+        private void fragment(byte flags, SegmentType type, ProtocolEvent mevent, MemoryStream buf)
+        {
+            byte typeb = (byte) type;
+            byte track = mevent.EncodedTrack == Frame.L4 ? (byte) 1 : (byte) 0;
+            int remaining = (int) buf.Length;
+            buf.Seek(0, SeekOrigin.Begin);
+            bool first = true;
+            while (true)
+            {
+                int size = Math.Min(_maxPayload, remaining);
+                remaining -= size;              
+
+                byte newflags = flags;
+                if (first)
+                {
+                    newflags |= Frame.FIRST_FRAME;
+                    first = false;
+                }
+                if (remaining == 0)
+                {
+                    newflags |= Frame.LAST_FRAME;
+                }                
+
+                frame(newflags, typeb, track, mevent.Channel, size, buf);
+
+                if (remaining == 0)
+                {
+                    break;
+                }
+            }
+        }
+
+        private MSEncoder getEncoder()
+        {
+            if( _encoder == null)
+            {
+                _encoder = new MSEncoder(4 * 1024);
+            }
+            return _encoder;
+        }
+
+        private void invokeMethod(Method method, SegmentType type)
+        {
+            MSEncoder encoder = getEncoder();
+            encoder.init();
+            encoder.writeUint16(method.getEncodedType());
+            if (type == SegmentType.COMMAND)
+            {
+                if (method.Sync)
+                {
+                    encoder.writeUint16(0x0101);
+                }
+                else
+                {
+                    encoder.writeUint16(0x0100);
+                }
+            }
+            method.write(_encoder);
+            MemoryStream methodSeg = encoder.segment();
+
+            byte flags = Frame.FIRST_SEG;
+
+            bool payload = method.hasPayload();
+            if (!payload)
+            {
+                flags |= Frame.LAST_SEG;
+            }
+
+            MemoryStream headerSeg = null;
+            if (payload)
+            {
+                Header hdr = method.Header;
+                Struct[] structs = hdr.Structs;
+
+                foreach (Struct st in structs)
+                {
+                    encoder.writeStruct32(st);
+                }
+                headerSeg = encoder.segment();
+            }
+
+            lock (_sendlock)
+            {
+                fragment(flags, type, method, methodSeg);
+                if (payload)
+                {
+                    fragment( 0x0, SegmentType.HEADER, method, headerSeg);
+                    fragment(Frame.LAST_SEG, SegmentType.BODY, method, method.Body);
+                }
+            }
+        }
+
+        #endregion
+    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/network/Frame.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/network/Frame.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/network/Frame.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/network/Frame.cs Thu Dec  3 22:03:51 2009
@@ -1,143 +1,143 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-using System;
-using System.IO;
-
-namespace org.apache.qpid.transport.network
-{
-    public sealed class Frame : NetworkEvent
-    {
-        internal static int HEADER_SIZE = 12;
-
-        // XXX: enums?
-        public const byte L1 = 0;
-        public const byte L2 = 1;
-        public const byte L3 = 2;
-        public const byte L4 = 3;
-
-        public static byte RESERVED = 0x0;
-
-        public static byte VERSION = 0x0;
-
-        public static byte FIRST_SEG = 0x8;
-        public static byte LAST_SEG = 0x4;
-        public static byte FIRST_FRAME = 0x2;
-        public static byte LAST_FRAME = 0x1;
-
-        private readonly byte flags;
-        private readonly SegmentType type;
-        private readonly byte track;
-        private readonly int channel;
-        private readonly MemoryStream body;
-        private int _bodySize;
-
-
-        public Frame(byte flags, SegmentType type, byte track, int channel, int bodySize,
-                     MemoryStream body)
-        {
-            this.flags = flags;
-            this.type = type;
-            this.track = track;
-            this.channel = channel;
-            this.body = body;
-            _bodySize = bodySize;
-        }
-
-        public int BodySize
-        {
-            get { return _bodySize; }
-        }
-
-        public MemoryStream Body
-        {
-            get { return body; }
-        }
-
-        public byte Flags
-        {
-            get { return flags; }
-        }
-
-        public int Channel
-        {
-            get { return channel; }
-        }
-
-        public int Size
-        {
-            get { return (int) body.Length;}
-        }
-
-        public SegmentType Type
-        {
-            get { return type; }
-        }
-
-        public byte Track
-        {
-            get { return track; }
-        }
-
-        private bool flag(byte mask)
-        {
-            return (flags & mask) != 0;
-        }
-
-        public bool isFirstSegment()
-        {
-            return flag(FIRST_SEG);
-        }
-
-        public bool isLastSegment()
-        {
-            return flag(LAST_SEG);
-        }
-
-        public bool isFirstFrame()
-        {
-            return flag(FIRST_FRAME);
-        }
-
-        public bool isLastFrame()
-        {
-            return flag(LAST_FRAME);
-        }
-
-        #region NetworkEvent Methods
-
-        public void ProcessNetworkEvent(NetworkDelegate ndelegate)
-        {
-            ndelegate.Frame(this);
-        }
-
-        #endregion
-
-        public String toString()
-        {
-            return String.Format
-                ("[{0:d} {1:d} {2:d} {3} {4}{5}{6}{7}] ", Channel, Size, Track, Type,                 
-                 isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
-                 isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0);
-        }
-
-      
-    }
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.IO;
+
+namespace org.apache.qpid.transport.network
+{
+    public sealed class Frame : NetworkEvent
+    {
+        internal static int HEADER_SIZE = 12;
+
+        // XXX: enums?
+        public const byte L1 = 0;
+        public const byte L2 = 1;
+        public const byte L3 = 2;
+        public const byte L4 = 3;
+
+        public static byte RESERVED = 0x0;
+
+        public static byte VERSION = 0x0;
+
+        public static byte FIRST_SEG = 0x8;
+        public static byte LAST_SEG = 0x4;
+        public static byte FIRST_FRAME = 0x2;
+        public static byte LAST_FRAME = 0x1;
+
+        private readonly byte flags;
+        private readonly SegmentType type;
+        private readonly byte track;
+        private readonly int channel;
+        private readonly MemoryStream body;
+        private int _bodySize;
+
+
+        public Frame(byte flags, SegmentType type, byte track, int channel, int bodySize,
+                     MemoryStream body)
+        {
+            this.flags = flags;
+            this.type = type;
+            this.track = track;
+            this.channel = channel;
+            this.body = body;
+            _bodySize = bodySize;
+        }
+
+        public int BodySize
+        {
+            get { return _bodySize; }
+        }
+
+        public MemoryStream Body
+        {
+            get { return body; }
+        }
+
+        public byte Flags
+        {
+            get { return flags; }
+        }
+
+        public int Channel
+        {
+            get { return channel; }
+        }
+
+        public int Size
+        {
+            get { return (int) body.Length;}
+        }
+
+        public SegmentType Type
+        {
+            get { return type; }
+        }
+
+        public byte Track
+        {
+            get { return track; }
+        }
+
+        private bool flag(byte mask)
+        {
+            return (flags & mask) != 0;
+        }
+
+        public bool isFirstSegment()
+        {
+            return flag(FIRST_SEG);
+        }
+
+        public bool isLastSegment()
+        {
+            return flag(LAST_SEG);
+        }
+
+        public bool isFirstFrame()
+        {
+            return flag(FIRST_FRAME);
+        }
+
+        public bool isLastFrame()
+        {
+            return flag(LAST_FRAME);
+        }
+
+        #region NetworkEvent Methods
+
+        public void ProcessNetworkEvent(NetworkDelegate ndelegate)
+        {
+            ndelegate.Frame(this);
+        }
+
+        #endregion
+
+        public String toString()
+        {
+            return String.Format
+                ("[{0:d} {1:d} {2:d} {3} {4}{5}{6}{7}] ", Channel, Size, Track, Type,                 
+                 isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
+                 isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0);
+        }
+
+      
+    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/network/IIoSender.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/network/IIoSender.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/network/IIoSender.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/network/IIoSender.cs Thu Dec  3 22:03:51 2009
@@ -1,28 +1,28 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-
-namespace org.apache.qpid.transport.network
-{
-    public interface IIOSender<T>:Sender<T>
-    {
-        void send(T body, int siz);
-    }
-}
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+namespace org.apache.qpid.transport.network
+{
+    public interface IIOSender<T>:Sender<T>
+    {
+        void send(T body, int siz);
+    }
+}

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/network/InputHandler.cs Thu Dec  3 22:03:51 2009
@@ -1,282 +1,282 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-using System;
-using System.IO;
-using System.Text;
-using org.apache.qpid.transport.util;
-
-namespace org.apache.qpid.transport.network
-{
-    /// <summary> 
-    /// InputHandler
-    /// </summary>
-    public sealed class InputHandler : Receiver<ReceivedPayload<NetworkEvent>>
-    {
-        public enum State
-        {
-            PROTO_HDR,
-            FRAME_HDR,
-            FRAME_BODY,
-            ERROR
-        }
-
-        private static readonly Logger log = Logger.get(typeof(InputHandler));
-        private readonly Object m_objectLock = new object();
-
-        // the event raised when a buffer is read from the wire        
-        public event EventHandler<ReceivedPayload<NetworkEvent>> ReceivedEvent;
-        public event EventHandler<ExceptionArgs> ExceptionProcessing;
-        public event EventHandler HandlerClosed;
-
-        event EventHandler<ReceivedPayload<NetworkEvent>> Receiver<ReceivedPayload<NetworkEvent>>.Received
-        {
-            add
-            {
-                lock (m_objectLock)
-                {
-                    ReceivedEvent += value;
-                }
-            }
-            remove
-            {
-                lock (m_objectLock)
-                {
-                    ReceivedEvent -= value;
-                }
-            }
-        }
-
-        event EventHandler<ExceptionArgs> Receiver<ReceivedPayload<NetworkEvent>>.Exception
-        {
-            add
-            {
-                lock (m_objectLock)
-                {
-                    ExceptionProcessing += value;
-                }
-            }
-            remove
-            {
-                lock (m_objectLock)
-                {
-                    ExceptionProcessing -= value;
-                }
-            }
-        }
-
-        event EventHandler Receiver<ReceivedPayload<NetworkEvent>>.Closed
-        {
-            add
-            {
-                lock (m_objectLock)
-                {
-                    HandlerClosed += value;
-                }
-            }
-            remove
-            {
-                lock (m_objectLock)
-                {
-                    HandlerClosed -= value;
-                }
-            }
-        }
-
-        private State state;
-        private MemoryStream input;
-        private int needed;
-
-        private byte flags;
-        private SegmentType type;
-        private byte track;
-        private int channel;
-
-        public InputHandler(State state)
-        {
-            this.state = state;
-            switch (state)
-            {
-                case State.PROTO_HDR:
-                    needed = 8;
-                    break;
-                case State.FRAME_HDR:
-                    needed = Frame.HEADER_SIZE;
-                    break;
-            }
-        }
-
-        // The command listening for a buffer read.  
-        public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload)
-        {
-            MemoryStream buf = payload.Payload;
-            int remaining = (int) buf.Length;
-            if( input != null )
-            {
-                remaining += (int) input.Length;
-            }
-            try
-            {
-                while (remaining > 0)
-                {
-                    if (remaining >= needed)
-                    {                        
-                        if (input != null)
-                        {
-                            byte[] tmp = new byte[buf.Length];
-                            buf.Read(tmp, 0, tmp.Length);
-                            input.Write(tmp, 0, tmp.Length);
-                            input.Seek(0, SeekOrigin.Begin);
-                            buf = input;    
-                        }                      
-                        int startPos = (int)buf.Position;
-                        int consumed = needed;
-                        state = next(buf);
-                        if ((buf.Position - startPos) < consumed)
-                        {
-                            buf.Seek(consumed  - (buf.Position - startPos), SeekOrigin.Current);
-                        }
-                        remaining -= consumed;
-                        input = null;                        
-                    }
-                    else
-                    {
-                        byte[] tmp;
-                        if (input == null)
-                        {
-                            input = new MemoryStream();
-                            tmp = new byte[remaining];                            
-                        }
-                        else
-                        {
-                            // this is a full buffer 
-                            tmp = new byte[buf.Length];
-                        }
-                        buf.Read(tmp, 0, tmp.Length);
-                        input.Write(tmp, 0, tmp.Length);
-                        remaining = 0;
-                    }
-                }
-            }
-            catch (Exception t)
-            {
-                Console.Write(t);
-                if (ExceptionProcessing != null)
-                {
-                    ExceptionProcessing(this, new ExceptionArgs(t));
-                }
-            }
-        }
-
-        #region Private Support Functions
-
-        private State next(MemoryStream buf)
-        {
-            BinaryReader reader = new BinaryReader(buf);
-
-            switch (state)
-            {
-                case State.PROTO_HDR:
-                    char a = reader.ReadChar();
-                    char m = reader.ReadChar();
-                    char q = reader.ReadChar();
-                    char p = reader.ReadChar();
-                    if (a != 'A' &&
-                        m != 'M' &&
-                        q != 'Q' &&
-                        p != 'P')
-                    {
-                        Error("bad protocol header: {0}", buf.ToString());
-                        return State.ERROR;
-                    }
-                    reader.ReadByte(); 
-                    byte instance = reader.ReadByte();
-                    byte major = reader.ReadByte();
-                    byte minor = reader.ReadByte();
-                    Fire_NetworkEvent(new ProtocolHeader(instance, major, minor));                    
-                    needed = Frame.HEADER_SIZE;
-                    return State.FRAME_HDR;
-                case State.FRAME_HDR:
-                    reader = new BinaryReader(buf, Encoding.BigEndianUnicode);
-                    flags = reader.ReadByte();
-                    type = SegmentTypeGetter.get(reader.ReadByte()); // generated code 
-                    int size =  reader.ReadUInt16();
-                    size = ByteEncoder.GetBigEndian((UInt16)size);                    
-                    size -= Frame.HEADER_SIZE;
-                    if (size < 0 || size > (64 * 1024 - 12))
-                    {
-                        Error("bad frame size: {0:d}", size);
-                        return State.ERROR;
-                    }
-                    reader.ReadByte();
-                    byte b = reader.ReadByte();
-                    if ((b & 0xF0) != 0)
-                    {
-                        Error("non-zero reserved bits in upper nibble of " +
-                              "frame header byte 5: {0}", b);
-                        return State.ERROR;
-                    }
-                    track = (byte)(b & 0xF);
-                    channel = reader.ReadUInt16();
-                    channel = ByteEncoder.GetBigEndian((UInt16)channel);  
-                    if (size == 0)
-                    {
-                        Fire_NetworkEvent(new Frame(flags, type, track, channel, 0, new MemoryStream()));                  
-                        needed = Frame.HEADER_SIZE;
-                        return State.FRAME_HDR;
-                    }
-                    needed = size;
-                    return State.FRAME_BODY;
-                case State.FRAME_BODY:                                       
-                    Fire_NetworkEvent(new Frame(flags, type, track, channel, needed, buf));                  
-                    needed = Frame.HEADER_SIZE;
-                    return State.FRAME_HDR;
-                default:
-                    if (ExceptionProcessing != null)
-                    {
-                        ExceptionProcessing(this, new ExceptionArgs(new Exception("Error creating frame")));
-                    }
-                    throw new Exception("Error creating frame");
-            }
-        }
-        
-        private void Error(String fmt, params Object[] args)
-        {
-            Fire_NetworkEvent(new ProtocolError(Frame.L1, fmt, args));            
-        }
-
-        private void Fire_NetworkEvent(NetworkEvent netevent)
-        {
-            log.debug("InputHandler: network event:", netevent);
-            ReceivedPayload<NetworkEvent> payload = new ReceivedPayload<NetworkEvent>();
-            payload.Payload = netevent;
-            if (ReceivedEvent != null)
-            {
-                ReceivedEvent(this, payload);
-            }
-            else
-            {
-                log.debug("Nobody listening for event: {0}");
-            }
-        }
-
-        #endregion
-    }
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.IO;
+using System.Text;
+using org.apache.qpid.transport.util;
+
+namespace org.apache.qpid.transport.network
+{
+    /// <summary> 
+    /// InputHandler
+    /// </summary>
+    public sealed class InputHandler : Receiver<ReceivedPayload<NetworkEvent>>
+    {
+        public enum State
+        {
+            PROTO_HDR,
+            FRAME_HDR,
+            FRAME_BODY,
+            ERROR
+        }
+
+        private static readonly Logger log = Logger.get(typeof(InputHandler));
+        private readonly Object m_objectLock = new object();
+
+        // the event raised when a buffer is read from the wire        
+        public event EventHandler<ReceivedPayload<NetworkEvent>> ReceivedEvent;
+        public event EventHandler<ExceptionArgs> ExceptionProcessing;
+        public event EventHandler HandlerClosed;
+
+        event EventHandler<ReceivedPayload<NetworkEvent>> Receiver<ReceivedPayload<NetworkEvent>>.Received
+        {
+            add
+            {
+                lock (m_objectLock)
+                {
+                    ReceivedEvent += value;
+                }
+            }
+            remove
+            {
+                lock (m_objectLock)
+                {
+                    ReceivedEvent -= value;
+                }
+            }
+        }
+
+        event EventHandler<ExceptionArgs> Receiver<ReceivedPayload<NetworkEvent>>.Exception
+        {
+            add
+            {
+                lock (m_objectLock)
+                {
+                    ExceptionProcessing += value;
+                }
+            }
+            remove
+            {
+                lock (m_objectLock)
+                {
+                    ExceptionProcessing -= value;
+                }
+            }
+        }
+
+        event EventHandler Receiver<ReceivedPayload<NetworkEvent>>.Closed
+        {
+            add
+            {
+                lock (m_objectLock)
+                {
+                    HandlerClosed += value;
+                }
+            }
+            remove
+            {
+                lock (m_objectLock)
+                {
+                    HandlerClosed -= value;
+                }
+            }
+        }
+
+        private State state;
+        private MemoryStream input;
+        private int needed;
+
+        private byte flags;
+        private SegmentType type;
+        private byte track;
+        private int channel;
+
+        public InputHandler(State state)
+        {
+            this.state = state;
+            switch (state)
+            {
+                case State.PROTO_HDR:
+                    needed = 8;
+                    break;
+                case State.FRAME_HDR:
+                    needed = Frame.HEADER_SIZE;
+                    break;
+            }
+        }
+
+        // The command listening for a buffer read.  
+        public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload)
+        {
+            MemoryStream buf = payload.Payload;
+            int remaining = (int) buf.Length;
+            if( input != null )
+            {
+                remaining += (int) input.Length;
+            }
+            try
+            {
+                while (remaining > 0)
+                {
+                    if (remaining >= needed)
+                    {                        
+                        if (input != null)
+                        {
+                            byte[] tmp = new byte[buf.Length];
+                            buf.Read(tmp, 0, tmp.Length);
+                            input.Write(tmp, 0, tmp.Length);
+                            input.Seek(0, SeekOrigin.Begin);
+                            buf = input;    
+                        }                      
+                        int startPos = (int)buf.Position;
+                        int consumed = needed;
+                        state = next(buf);
+                        if ((buf.Position - startPos) < consumed)
+                        {
+                            buf.Seek(consumed  - (buf.Position - startPos), SeekOrigin.Current);
+                        }
+                        remaining -= consumed;
+                        input = null;                        
+                    }
+                    else
+                    {
+                        byte[] tmp;
+                        if (input == null)
+                        {
+                            input = new MemoryStream();
+                            tmp = new byte[remaining];                            
+                        }
+                        else
+                        {
+                            // this is a full buffer 
+                            tmp = new byte[buf.Length];
+                        }
+                        buf.Read(tmp, 0, tmp.Length);
+                        input.Write(tmp, 0, tmp.Length);
+                        remaining = 0;
+                    }
+                }
+            }
+            catch (Exception t)
+            {
+                Console.Write(t);
+                if (ExceptionProcessing != null)
+                {
+                    ExceptionProcessing(this, new ExceptionArgs(t));
+                }
+            }
+        }
+
+        #region Private Support Functions
+
+        private State next(MemoryStream buf)
+        {
+            BinaryReader reader = new BinaryReader(buf);
+
+            switch (state)
+            {
+                case State.PROTO_HDR:
+                    char a = reader.ReadChar();
+                    char m = reader.ReadChar();
+                    char q = reader.ReadChar();
+                    char p = reader.ReadChar();
+                    if (a != 'A' &&
+                        m != 'M' &&
+                        q != 'Q' &&
+                        p != 'P')
+                    {
+                        Error("bad protocol header: {0}", buf.ToString());
+                        return State.ERROR;
+                    }
+                    reader.ReadByte(); 
+                    byte instance = reader.ReadByte();
+                    byte major = reader.ReadByte();
+                    byte minor = reader.ReadByte();
+                    Fire_NetworkEvent(new ProtocolHeader(instance, major, minor));                    
+                    needed = Frame.HEADER_SIZE;
+                    return State.FRAME_HDR;
+                case State.FRAME_HDR:
+                    reader = new BinaryReader(buf, Encoding.BigEndianUnicode);
+                    flags = reader.ReadByte();
+                    type = SegmentTypeGetter.get(reader.ReadByte()); // generated code 
+                    int size =  reader.ReadUInt16();
+                    size = ByteEncoder.GetBigEndian((UInt16)size);                    
+                    size -= Frame.HEADER_SIZE;
+                    if (size < 0 || size > (64 * 1024 - 12))
+                    {
+                        Error("bad frame size: {0:d}", size);
+                        return State.ERROR;
+                    }
+                    reader.ReadByte();
+                    byte b = reader.ReadByte();
+                    if ((b & 0xF0) != 0)
+                    {
+                        Error("non-zero reserved bits in upper nibble of " +
+                              "frame header byte 5: {0}", b);
+                        return State.ERROR;
+                    }
+                    track = (byte)(b & 0xF);
+                    channel = reader.ReadUInt16();
+                    channel = ByteEncoder.GetBigEndian((UInt16)channel);  
+                    if (size == 0)
+                    {
+                        Fire_NetworkEvent(new Frame(flags, type, track, channel, 0, new MemoryStream()));                  
+                        needed = Frame.HEADER_SIZE;
+                        return State.FRAME_HDR;
+                    }
+                    needed = size;
+                    return State.FRAME_BODY;
+                case State.FRAME_BODY:                                       
+                    Fire_NetworkEvent(new Frame(flags, type, track, channel, needed, buf));                  
+                    needed = Frame.HEADER_SIZE;
+                    return State.FRAME_HDR;
+                default:
+                    if (ExceptionProcessing != null)
+                    {
+                        ExceptionProcessing(this, new ExceptionArgs(new Exception("Error creating frame")));
+                    }
+                    throw new Exception("Error creating frame");
+            }
+        }
+        
+        private void Error(String fmt, params Object[] args)
+        {
+            Fire_NetworkEvent(new ProtocolError(Frame.L1, fmt, args));            
+        }
+
+        private void Fire_NetworkEvent(NetworkEvent netevent)
+        {
+            log.debug("InputHandler: network event:", netevent);
+            ReceivedPayload<NetworkEvent> payload = new ReceivedPayload<NetworkEvent>();
+            payload.Payload = netevent;
+            if (ReceivedEvent != null)
+            {
+                ReceivedEvent(this, payload);
+            }
+            else
+            {
+                log.debug("Nobody listening for event: {0}");
+            }
+        }
+
+        #endregion
+    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/network/NetworkDelegate.cs Thu Dec  3 22:03:51 2009
@@ -1,40 +1,40 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-using ProtocolError = org.apache.qpid.transport.ProtocolError;
-using ProtocolHeader = org.apache.qpid.transport.ProtocolHeader;
-namespace org.apache.qpid.transport.network
-{
-	
-	
-	/// <summary> 
-    /// NetworkDelegate
-	/// </summary>	
-	
-	public interface NetworkDelegate
-	{
-		
-		void  Init(ProtocolHeader header);
-		
-		void  Frame(Frame frame);
-		
-		void  Error(ProtocolError error);
-	}
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using ProtocolError = org.apache.qpid.transport.ProtocolError;
+using ProtocolHeader = org.apache.qpid.transport.ProtocolHeader;
+namespace org.apache.qpid.transport.network
+{
+	
+	
+	/// <summary> 
+    /// NetworkDelegate
+	/// </summary>	
+	
+	public interface NetworkDelegate
+	{
+		
+		void  Init(ProtocolHeader header);
+		
+		void  Frame(Frame frame);
+		
+		void  Error(ProtocolError error);
+	}
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/network/NetworkEvent.cs Thu Dec  3 22:03:51 2009
@@ -1,32 +1,32 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-namespace org.apache.qpid.transport.network
-{
-	
-	/// <summary> 
-    /// NetworkEvent
-	/// </summary>
-	
-	public interface NetworkEvent
-	{		
-		 void  ProcessNetworkEvent(NetworkDelegate networkDelegate);
-	}
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+namespace org.apache.qpid.transport.network
+{
+	
+	/// <summary> 
+    /// NetworkEvent
+	/// </summary>
+	
+	public interface NetworkEvent
+	{		
+		 void  ProcessNetworkEvent(NetworkDelegate networkDelegate);
+	}
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IIoTransport.cs Thu Dec  3 22:03:51 2009
@@ -1,57 +1,57 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-using System.IO;
-using System.Net.Sockets;
-
-namespace org.apache.qpid.transport.network.io
-{
-    public interface IIoTransport
-    {
-        Connection Connection
-        {
-            get;
-            set;
-        }
-
-        Receiver<ReceivedPayload<MemoryStream>> Receiver
-        {
-            get;
-            set;
-        }
-
-        IoSender Sender
-        {
-            get;
-            set;
-        }
-
-
-        Stream Stream
-        {
-            get;
-            set;
-        }
-
-        TcpClient Socket
-        {
-            get;
-            set;
-        }
-    }
-}
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+using System.IO;
+using System.Net.Sockets;
+
+namespace org.apache.qpid.transport.network.io
+{
+    public interface IIoTransport
+    {
+        Connection Connection
+        {
+            get;
+            set;
+        }
+
+        Receiver<ReceivedPayload<MemoryStream>> Receiver
+        {
+            get;
+            set;
+        }
+
+        IoSender Sender
+        {
+            get;
+            set;
+        }
+
+
+        Stream Stream
+        {
+            get;
+            set;
+        }
+
+        TcpClient Socket
+        {
+            get;
+            set;
+        }
+    }
+}

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/network/io/IoReceiver.cs Thu Dec  3 22:03:51 2009
@@ -1,189 +1,189 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-using System;
-using System.IO;
-using System.Threading;
-using Logger = org.apache.qpid.transport.util.Logger;
-
-
-namespace org.apache.qpid.transport.network.io
-{
-    /// <summary> 
-    /// IoReceiver
-    /// </summary>
-    public sealed class IoReceiver : Receiver<ReceivedPayload<MemoryStream>>
-    {
-        private static readonly Logger log = Logger.get(typeof(IoReceiver));      
-        private readonly int m_bufferSize;
-        private readonly Stream m_bufStream;
-        private readonly int m_timeout;
-        private readonly Thread m_thread;
-        private bool m_closed;
-        private readonly Object m_objectLock = new object();
-        
-        // the event raised when a buffer is read from the wire        
-        event EventHandler<ReceivedPayload<MemoryStream>> ReceivedBuffer;
-        event EventHandler<ExceptionArgs> ExceptionReading;
-        event EventHandler ReceiverClosed;
-
-        event EventHandler<ReceivedPayload<MemoryStream>> Receiver<ReceivedPayload<MemoryStream>>.Received
-        {
-            add
-            {
-                lock (m_objectLock)
-                {
-                    ReceivedBuffer += value;                  
-                }
-            }
-            remove
-            {
-                lock (m_objectLock)
-                {
-                    ReceivedBuffer -= value;
-                }
-            }
-        }
-
-        event EventHandler<ExceptionArgs> Receiver<ReceivedPayload<MemoryStream>>.Exception
-        {
-            add
-            {
-                lock (m_objectLock)
-                {
-                    ExceptionReading += value;
-                }
-            }
-            remove
-            {
-                lock (m_objectLock)
-                {
-                    ExceptionReading -= value;
-                }
-            }
-        }
-
-        event EventHandler Receiver<ReceivedPayload<MemoryStream>>.Closed
-        {
-            add
-            {
-                lock (m_objectLock)
-                {
-                    ReceiverClosed += value;
-                }
-            }
-            remove
-            {
-                lock (m_objectLock)
-                {
-                    ReceiverClosed -= value;
-                }
-            }
-        }
-
-        public IoReceiver(Stream stream, int bufferSize, int timeout)
-        {     
-            m_bufferSize = bufferSize;
-            m_bufStream = stream; 
-            m_timeout = timeout;
-            m_thread = new Thread(Go);
-            m_thread.Name = String.Format("IoReceiver - {0}", stream);
-            m_thread.IsBackground = true;
-            m_thread.Start();
-        }
-
-        public void close()
-        {
-            Mutex mut = new Mutex();
-            mut.WaitOne();
-            if (!m_closed)
-            {
-                m_closed = true;               
-                try
-                {
-                    log.debug("Receiver closing");
-                    m_bufStream.Close();
-                    m_thread.Join(m_timeout);
-                    if (m_thread.IsAlive)
-                    {
-                        throw new TransportException("join timed out");
-                    }
-                }
-                catch (ThreadInterruptedException e)
-                {
-                    throw new TransportException(e);
-                }
-                catch (IOException e)
-                {
-                    throw new TransportException(e);
-                }                                                    
-            }
-            mut.ReleaseMutex();
-        }
-
-        void Go()
-        {
-            // create a BufferedStream on top of the NetworkStream.
-            int threshold = m_bufferSize/2;
-            byte[] buffer = new byte[m_bufferSize];
-            try
-            {
-                int read;
-                int offset = 0;
-                ReceivedPayload<MemoryStream> payload = new ReceivedPayload<MemoryStream>();
-                while ((read = m_bufStream.Read(buffer, offset, m_bufferSize - offset)) > 0)
-                {
-                    MemoryStream memStream = new MemoryStream(buffer, offset, read);
-                    if (ReceivedBuffer != null)
-                    {
-                        // call the event 
-                        payload.Payload = memStream;
-                        ReceivedBuffer(this, payload);
-                    }
-                    offset += read;
-                    if (offset > threshold)
-                    {
-                        offset = 0;
-                        buffer = new byte[m_bufferSize];
-                    }
-                }
-                log.debug("Receiver thread terminating");
-            }
-            catch (IOException e)
-            {
-                // IOException is thrown when the socket is closed according to the docs
-            }
-            catch (Exception t)
-            {
-                if (ExceptionReading != null)
-                {
-                    ExceptionReading(this, new ExceptionArgs(t));
-                }
-            }
-            finally
-            {
-                if (ReceiverClosed != null)
-                {
-                    ReceiverClosed(this, new EventArgs());
-                }
-            }
-        }
-    }
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.IO;
+using System.Threading;
+using Logger = org.apache.qpid.transport.util.Logger;
+
+
+namespace org.apache.qpid.transport.network.io
+{
+    /// <summary> 
+    /// IoReceiver
+    /// </summary>
+    public sealed class IoReceiver : Receiver<ReceivedPayload<MemoryStream>>
+    {
+        private static readonly Logger log = Logger.get(typeof(IoReceiver));      
+        private readonly int m_bufferSize;
+        private readonly Stream m_bufStream;
+        private readonly int m_timeout;
+        private readonly Thread m_thread;
+        private bool m_closed;
+        private readonly Object m_objectLock = new object();
+        
+        // the event raised when a buffer is read from the wire        
+        event EventHandler<ReceivedPayload<MemoryStream>> ReceivedBuffer;
+        event EventHandler<ExceptionArgs> ExceptionReading;
+        event EventHandler ReceiverClosed;
+
+        event EventHandler<ReceivedPayload<MemoryStream>> Receiver<ReceivedPayload<MemoryStream>>.Received
+        {
+            add
+            {
+                lock (m_objectLock)
+                {
+                    ReceivedBuffer += value;                  
+                }
+            }
+            remove
+            {
+                lock (m_objectLock)
+                {
+                    ReceivedBuffer -= value;
+                }
+            }
+        }
+
+        event EventHandler<ExceptionArgs> Receiver<ReceivedPayload<MemoryStream>>.Exception
+        {
+            add
+            {
+                lock (m_objectLock)
+                {
+                    ExceptionReading += value;
+                }
+            }
+            remove
+            {
+                lock (m_objectLock)
+                {
+                    ExceptionReading -= value;
+                }
+            }
+        }
+
+        event EventHandler Receiver<ReceivedPayload<MemoryStream>>.Closed
+        {
+            add
+            {
+                lock (m_objectLock)
+                {
+                    ReceiverClosed += value;
+                }
+            }
+            remove
+            {
+                lock (m_objectLock)
+                {
+                    ReceiverClosed -= value;
+                }
+            }
+        }
+
+        public IoReceiver(Stream stream, int bufferSize, int timeout)
+        {     
+            m_bufferSize = bufferSize;
+            m_bufStream = stream; 
+            m_timeout = timeout;
+            m_thread = new Thread(Go);
+            m_thread.Name = String.Format("IoReceiver - {0}", stream);
+            m_thread.IsBackground = true;
+            m_thread.Start();
+        }
+
+        public void close()
+        {
+            Mutex mut = new Mutex();
+            mut.WaitOne();
+            if (!m_closed)
+            {
+                m_closed = true;               
+                try
+                {
+                    log.debug("Receiver closing");
+                    m_bufStream.Close();
+                    m_thread.Join(m_timeout);
+                    if (m_thread.IsAlive)
+                    {
+                        throw new TransportException("join timed out");
+                    }
+                }
+                catch (ThreadInterruptedException e)
+                {
+                    throw new TransportException(e);
+                }
+                catch (IOException e)
+                {
+                    throw new TransportException(e);
+                }                                                    
+            }
+            mut.ReleaseMutex();
+        }
+
+        void Go()
+        {
+            // create a BufferedStream on top of the NetworkStream.
+            int threshold = m_bufferSize/2;
+            byte[] buffer = new byte[m_bufferSize];
+            try
+            {
+                int read;
+                int offset = 0;
+                ReceivedPayload<MemoryStream> payload = new ReceivedPayload<MemoryStream>();
+                while ((read = m_bufStream.Read(buffer, offset, m_bufferSize - offset)) > 0)
+                {
+                    MemoryStream memStream = new MemoryStream(buffer, offset, read);
+                    if (ReceivedBuffer != null)
+                    {
+                        // call the event 
+                        payload.Payload = memStream;
+                        ReceivedBuffer(this, payload);
+                    }
+                    offset += read;
+                    if (offset > threshold)
+                    {
+                        offset = 0;
+                        buffer = new byte[m_bufferSize];
+                    }
+                }
+                log.debug("Receiver thread terminating");
+            }
+            catch (IOException e)
+            {
+                // IOException is thrown when the socket is closed according to the docs
+            }
+            catch (Exception t)
+            {
+                if (ExceptionReading != null)
+                {
+                    ExceptionReading(this, new ExceptionArgs(t));
+                }
+            }
+            finally
+            {
+                if (ReceiverClosed != null)
+                {
+                    ReceiverClosed(this, new EventArgs());
+                }
+            }
+        }
+    }
 }
\ No newline at end of file



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message