qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ai...@apache.org
Subject svn commit: r886940 [4/14] - in /qpid/trunk/qpid/dotnet/client-010: ./ addins/ addins/ExcelAddIn/ addins/ExcelAddInMessageProcessor/ addins/ExcelAddInProducer/ client/ client/client/ client/transport/ client/transport/codec/ client/transport/exception/...
Date Thu, 03 Dec 2009 22:03:55 GMT
Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/Session.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/Session.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/Session.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/Session.cs Thu Dec  3 22:03:51 2009
@@ -1,522 +1,522 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Text;
-using System.Threading;
-using org.apache.qpid.transport.util;
-using Frame = org.apache.qpid.transport.network.Frame;
-using Logger = org.apache.qpid.transport.util.Logger;
-
-
-namespace org.apache.qpid.transport
-{
-    /// <summary>
-    ///  Session
-    /// 
-    /// </summary>
-    public class Session : Invoker
-    {
-        private static readonly Logger log = Logger.get(typeof (Session));
-        private static readonly bool ENABLE_REPLAY;
-
-        static Session()
-        {
-            const string enableReplay = "enable_command_replay";
-            try
-            {
-                String var = Environment.GetEnvironmentVariable(enableReplay);
-                if (var != null)
-                {
-                    ENABLE_REPLAY = bool.Parse(var);
-                }
-            }
-            catch (Exception)
-            {
-                ENABLE_REPLAY = false;
-            }
-        }
-
-        private readonly byte[] _name;
-        private const long _timeout = 600000;
-        private bool _autoSync = false;
-
-        // channel may be null
-        private Channel _channel;
-
-        // incoming command count
-        private int _commandsIn = 0;
-        // completed incoming commands
-        private readonly Object _processedLock = new Object();
-        private RangeSet _processed = new RangeSet();
-        private int _maxProcessed = - 1;
-        private int _syncPoint = -1;
-
-        // outgoing command count
-        private int _commandsOut = 0;
-        private readonly Dictionary<int, Method> _commands = new Dictionary<int, Method>();
-        private int _maxComplete = - 1;
-        private bool _needSync = false;
-        private bool _closed;
-        private readonly Dictionary<int, Future> _results = new Dictionary<int, Future>();
-        private readonly List<ExecutionException> _exceptions = new List<ExecutionException>();
-
-
-        public bool Closed
-        {
-            get
-            {
-                lock (this)
-                {
-                    return _closed;
-                }
-            }
-            set
-            {
-                lock (this)
-                {
-                    _closed = value;
-                }
-            }
-        }
-
-        public string Name
-        {
-            get
-            {
-                ASCIIEncoding enc = new ASCIIEncoding();
-                return enc.GetString(_name);
-            }
-        }
-
-        public Session(byte[] name)
-        {
-            _name = name;
-        }
-
-        public byte[] getName()
-        {
-            return _name;
-        }
-
-        public void setAutoSync(bool value)
-        {
-            lock (_commands)
-            {
-                _autoSync = value;
-            }
-        }
-
-        public Dictionary<int, Method> getOutstandingCommands()
-        {
-            return _commands;
-        }
-
-        public int getCommandsOut()
-        {
-            return _commandsOut;
-        }
-
-        public int CommandsIn
-        {
-            get { return _commandsIn; }
-            set { _commandsIn = value; }
-        }
-
-        public int nextCommandId()
-        {
-            return _commandsIn++;
-        }
-
-        public void identify(Method cmd)
-        {
-            int id = nextCommandId();
-            cmd.Id = id;
-
-            if (log.isDebugEnabled())
-            {
-                log.debug("ID: [{0}] %{1}", _channel, id);
-            }
-
-            //if ((id % 65536) == 0)
-            if ((id & 0xff) == 0)
-            {
-                flushProcessed(Option.TIMELY_REPLY);
-            }
-        }
-
-        public void processed(Method command)
-        {
-            processed(command.Id);
-        }
-
-        public void processed(int command)
-        {
-            processed(new Range(command, command));
-        }
-
-        public void processed(int lower, int upper)
-        {
-            processed(new Range(lower, upper));
-        }
-
-        public void processed(Range range)
-        {
-            log.debug("{0} processed({1})", this, range);
-
-            bool flush;
-            lock (_processedLock)
-            {
-                _processed.add(range);
-                Range first = _processed.getFirst();
-                int lower = first.Lower;
-                int upper = first.Upper;
-                int old = _maxProcessed;
-                if (Serial.le(lower, _maxProcessed + 1))
-                {
-                    _maxProcessed = Serial.max(_maxProcessed, upper);
-                }
-                flush = Serial.lt(old, _syncPoint) && Serial.ge(_maxProcessed, _syncPoint);
-                _syncPoint = _maxProcessed;
-            }
-            if (flush)
-            {
-                flushProcessed();
-            }
-        }
-
-        public void flushProcessed(params Option[] options)
-        {
-            RangeSet copy;
-            lock (_processedLock)
-            {
-                copy = _processed.copy();
-            }
-            sessionCompleted(copy, options);
-        }
-
-        public void knownComplete(RangeSet kc)
-        {
-            lock (_processedLock)
-            {
-                RangeSet newProcessed = new RangeSet();
-                foreach (Range pr in _processed)
-                {
-                    foreach (Range kr in kc)
-                    {
-                        foreach (Range r in pr.subtract(kr))
-                        {
-                            newProcessed.add(r);
-                        }
-                    }
-                }
-                _processed = newProcessed;
-            }
-        }
-
-        public void syncPoint()
-        {
-            int id = CommandsIn - 1;
-            log.debug("{0} synced to {1}", this, id);
-            bool flush;
-            lock (_processedLock)
-            {
-                _syncPoint = id;
-                flush = Serial.ge(_maxProcessed, _syncPoint);
-            }
-            if (flush)
-            {
-                flushProcessed();
-            }
-        }
-
-        public void attach(Channel channel)
-        {
-            _channel = channel;
-            _channel.Session = this;
-        }
-
-        public Method getCommand(int id)
-        {
-            lock (_commands)
-            {
-                return _commands[id];
-            }
-        }
-
-        public bool complete(int lower, int upper)
-        {
-            //avoid autoboxing
-            if (log.isDebugEnabled())
-            {
-                log.debug("{0} complete({1}, {2})", this, lower, upper);
-            }
-            lock (_commands)
-            {
-                int old = _maxComplete;
-                for (int id = Serial.max(_maxComplete, lower); Serial.le(id, upper); id++)
-                {
-                    _commands.Remove(id);
-                }
-                if (Serial.le(lower, _maxComplete + 1))
-                {
-                    _maxComplete = Serial.max(_maxComplete, upper);
-                }
-                log.debug("{0} commands remaining: {1}", this, _commands);
-                Monitor.PulseAll(_commands);
-                return Serial.gt(_maxComplete, old);
-            }
-        }
-
-        protected override void invoke(Method m)
-        {
-            if (Closed)
-            {
-                List<ExecutionException> exc = getExceptions();
-                if (exc.Count > 0)
-                {
-                    throw new SessionException(exc);
-                }
-                else if (_close != null)
-                {
-                    throw new ConnectionException(_close);
-                }
-                else
-                {
-                    throw new SessionClosedException();
-                }
-            }
-
-            if (m.EncodedTrack == Frame.L4)
-            {
-                lock (_commands)
-                {
-                    int next = _commandsOut++;
-                    m.Id = next;
-                    if (next == 0)
-                    {
-                        sessionCommandPoint(0, 0);
-                    }
-                    if (ENABLE_REPLAY)
-                    {
-                        _commands.Add(next, m);
-                    }
-                    if (_autoSync)
-                    {
-                        m.Sync = true;
-                    }
-                    _needSync = ! m.Sync;
-                    _channel.method(m);
-                    if (_autoSync)
-                    {
-                        sync();
-                    }
-
-                    // flush every 64K commands to avoid ambiguity on
-                    // wraparound
-                    if ((next%65536) == 0)
-                    {
-                        sessionFlush(Option.COMPLETED);
-                    }
-                }
-            }
-            else
-            {
-                _channel.method(m);
-            }
-        }
-
-        public void sync()
-        {
-            sync(_timeout);
-        }
-
-        public void sync(long timeout)
-        {
-            log.debug("{0} sync()", this);
-            lock (_commands)
-            {
-                int point = _commandsOut - 1;
-
-                if (_needSync && Serial.lt(_maxComplete, point))
-                {
-                    executionSync(Option.SYNC);
-                }
-
-                DateTime start = DateTime.Now;
-                long elapsed = 0;
-
-                while (! Closed && elapsed < timeout && Serial.lt(_maxComplete, point))
-                {
-                    log.debug("{0}   waiting for[{1}]: {2}, {3}", this, point,
-                              _maxComplete, _commands);
-                    Monitor.Wait(_commands, (int) (timeout - elapsed));
-                    elapsed = DateTime.Now.Subtract(start).Milliseconds;
-                }
-
-                if (Serial.lt(_maxComplete, point))
-                {
-                    if (Closed)
-                    {
-                        throw new SessionException(getExceptions());
-                    }
-                    else
-                    {
-                        throw new Exception
-                            (String.Format
-                                 ("timed out waiting for sync: complete = {0}, point = {1}", _maxComplete, point));
-                    }
-                }
-            }
-        }
-
-
-        public void result(int command, Struct result)
-        {
-            Future future;
-            lock (_results)
-            {
-                if (_results.ContainsKey(command))
-                {
-                    future = _results[command];
-                    _results.Remove(command);
-                }
-                else
-                {
-                    throw new Exception(String.Format("Cannot ger result {0} for {1}", command, result));
-                }
-            }
-            future.Result = result;
-        }
-
-        public void addException(ExecutionException exc)
-        {
-            lock (_exceptions)
-            {
-                _exceptions.Add(exc);
-            }
-        }
-
-        private ConnectionClose _close = null;
-
-        public void closeCode(ConnectionClose close)
-        {
-            _close = close;
-        }
-
-        public List<ExecutionException> getExceptions()
-        {
-            lock (_exceptions)
-            {
-                return new List<ExecutionException>(_exceptions);
-            }
-        }
-
-        public override Future invoke(Method m, Future future)     
-        {
-            lock (_commands)
-            {
-                future.Session = this;
-                int command = _commandsOut;
-                lock (_results)
-                {
-                    _results.Add(command, future);
-                }
-                invoke(m);
-            }
-            return future;
-        }
-
-
-        public void messageTransfer(String destination,
-                                    MessageAcceptMode acceptMode,
-                                    MessageAcquireMode acquireMode,
-                                    Header header,
-                                    byte[] body,
-                                    params Option[] options)
-        {
-            MemoryStream mbody = new MemoryStream();
-            mbody.Write(body,0, body.Length);
-            messageTransfer(destination, acceptMode, acquireMode, header,
-                            mbody, options);
-        }
-
-        public void messageTransfer(String destination,
-                                    MessageAcceptMode acceptMode,
-                                    MessageAcquireMode acquireMode,
-                                    Header header,
-                                    String body,
-                                    params Option[] options)
-        {
-            messageTransfer(destination, acceptMode, acquireMode, header,
-                            new MemoryStream(Convert.ToByte(body)), options);
-        }
-
-        public void close()
-        {
-            sessionRequestTimeout(0);
-            sessionDetach(_name);
-            lock (_commands)
-            {
-                DateTime start = DateTime.Now;
-                long elapsed = 0;
-
-                while (! Closed && elapsed < _timeout)
-                {
-                    Monitor.Wait(_commands, (int) (_timeout - elapsed));
-                    elapsed = DateTime.Now.Subtract(start).Milliseconds;
-                }
-            }
-        }
-
-        public void exception(Exception t)
-        {
-            log.error(t, "Caught exception");
-        }
-
-        public void closed()
-        {
-            Closed = true;
-            lock (_commands)
-            {
-                Monitor.PulseAll(_commands);
-            }
-            lock (_results)
-            {
-                foreach (Future result in _results.Values)
-                {
-                    lock (result)
-                    {
-                        Monitor.PulseAll(result);
-                    }
-                }
-            }
-            _channel.Session = null;
-            _channel = null;
-        }
-
-        public String toString()
-        {
-            return String.Format("session:{0}", _name);
-        }
-    }
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using System.Threading;
+using org.apache.qpid.transport.util;
+using Frame = org.apache.qpid.transport.network.Frame;
+using Logger = org.apache.qpid.transport.util.Logger;
+
+
+namespace org.apache.qpid.transport
+{
+    /// <summary>
+    ///  Session
+    /// 
+    /// </summary>
+    public class Session : Invoker
+    {
+        private static readonly Logger log = Logger.get(typeof (Session));
+        private static readonly bool ENABLE_REPLAY;
+
+        static Session()
+        {
+            const string enableReplay = "enable_command_replay";
+            try
+            {
+                String var = Environment.GetEnvironmentVariable(enableReplay);
+                if (var != null)
+                {
+                    ENABLE_REPLAY = bool.Parse(var);
+                }
+            }
+            catch (Exception)
+            {
+                ENABLE_REPLAY = false;
+            }
+        }
+
+        private readonly byte[] _name;
+        private const long _timeout = 600000;
+        private bool _autoSync = false;
+
+        // channel may be null
+        private Channel _channel;
+
+        // incoming command count
+        private int _commandsIn = 0;
+        // completed incoming commands
+        private readonly Object _processedLock = new Object();
+        private RangeSet _processed = new RangeSet();
+        private int _maxProcessed = - 1;
+        private int _syncPoint = -1;
+
+        // outgoing command count
+        private int _commandsOut = 0;
+        private readonly Dictionary<int, Method> _commands = new Dictionary<int, Method>();
+        private int _maxComplete = - 1;
+        private bool _needSync = false;
+        private bool _closed;
+        private readonly Dictionary<int, Future> _results = new Dictionary<int, Future>();
+        private readonly List<ExecutionException> _exceptions = new List<ExecutionException>();
+
+
+        public bool Closed
+        {
+            get
+            {
+                lock (this)
+                {
+                    return _closed;
+                }
+            }
+            set
+            {
+                lock (this)
+                {
+                    _closed = value;
+                }
+            }
+        }
+
+        public string Name
+        {
+            get
+            {
+                ASCIIEncoding enc = new ASCIIEncoding();
+                return enc.GetString(_name);
+            }
+        }
+
+        public Session(byte[] name)
+        {
+            _name = name;
+        }
+
+        public byte[] getName()
+        {
+            return _name;
+        }
+
+        public void setAutoSync(bool value)
+        {
+            lock (_commands)
+            {
+                _autoSync = value;
+            }
+        }
+
+        public Dictionary<int, Method> getOutstandingCommands()
+        {
+            return _commands;
+        }
+
+        public int getCommandsOut()
+        {
+            return _commandsOut;
+        }
+
+        public int CommandsIn
+        {
+            get { return _commandsIn; }
+            set { _commandsIn = value; }
+        }
+
+        public int nextCommandId()
+        {
+            return _commandsIn++;
+        }
+
+        public void identify(Method cmd)
+        {
+            int id = nextCommandId();
+            cmd.Id = id;
+
+            if (log.isDebugEnabled())
+            {
+                log.debug("ID: [{0}] %{1}", _channel, id);
+            }
+
+            //if ((id % 65536) == 0)
+            if ((id & 0xff) == 0)
+            {
+                flushProcessed(Option.TIMELY_REPLY);
+            }
+        }
+
+        public void processed(Method command)
+        {
+            processed(command.Id);
+        }
+
+        public void processed(int command)
+        {
+            processed(new Range(command, command));
+        }
+
+        public void processed(int lower, int upper)
+        {
+            processed(new Range(lower, upper));
+        }
+
+        public void processed(Range range)
+        {
+            log.debug("{0} processed({1})", this, range);
+
+            bool flush;
+            lock (_processedLock)
+            {
+                _processed.add(range);
+                Range first = _processed.getFirst();
+                int lower = first.Lower;
+                int upper = first.Upper;
+                int old = _maxProcessed;
+                if (Serial.le(lower, _maxProcessed + 1))
+                {
+                    _maxProcessed = Serial.max(_maxProcessed, upper);
+                }
+                flush = Serial.lt(old, _syncPoint) && Serial.ge(_maxProcessed, _syncPoint);
+                _syncPoint = _maxProcessed;
+            }
+            if (flush)
+            {
+                flushProcessed();
+            }
+        }
+
+        public void flushProcessed(params Option[] options)
+        {
+            RangeSet copy;
+            lock (_processedLock)
+            {
+                copy = _processed.copy();
+            }
+            sessionCompleted(copy, options);
+        }
+
+        public void knownComplete(RangeSet kc)
+        {
+            lock (_processedLock)
+            {
+                RangeSet newProcessed = new RangeSet();
+                foreach (Range pr in _processed)
+                {
+                    foreach (Range kr in kc)
+                    {
+                        foreach (Range r in pr.subtract(kr))
+                        {
+                            newProcessed.add(r);
+                        }
+                    }
+                }
+                _processed = newProcessed;
+            }
+        }
+
+        public void syncPoint()
+        {
+            int id = CommandsIn - 1;
+            log.debug("{0} synced to {1}", this, id);
+            bool flush;
+            lock (_processedLock)
+            {
+                _syncPoint = id;
+                flush = Serial.ge(_maxProcessed, _syncPoint);
+            }
+            if (flush)
+            {
+                flushProcessed();
+            }
+        }
+
+        public void attach(Channel channel)
+        {
+            _channel = channel;
+            _channel.Session = this;
+        }
+
+        public Method getCommand(int id)
+        {
+            lock (_commands)
+            {
+                return _commands[id];
+            }
+        }
+
+        public bool complete(int lower, int upper)
+        {
+            //avoid autoboxing
+            if (log.isDebugEnabled())
+            {
+                log.debug("{0} complete({1}, {2})", this, lower, upper);
+            }
+            lock (_commands)
+            {
+                int old = _maxComplete;
+                for (int id = Serial.max(_maxComplete, lower); Serial.le(id, upper); id++)
+                {
+                    _commands.Remove(id);
+                }
+                if (Serial.le(lower, _maxComplete + 1))
+                {
+                    _maxComplete = Serial.max(_maxComplete, upper);
+                }
+                log.debug("{0} commands remaining: {1}", this, _commands);
+                Monitor.PulseAll(_commands);
+                return Serial.gt(_maxComplete, old);
+            }
+        }
+
+        protected override void invoke(Method m)
+        {
+            if (Closed)
+            {
+                List<ExecutionException> exc = getExceptions();
+                if (exc.Count > 0)
+                {
+                    throw new SessionException(exc);
+                }
+                else if (_close != null)
+                {
+                    throw new ConnectionException(_close);
+                }
+                else
+                {
+                    throw new SessionClosedException();
+                }
+            }
+
+            if (m.EncodedTrack == Frame.L4)
+            {
+                lock (_commands)
+                {
+                    int next = _commandsOut++;
+                    m.Id = next;
+                    if (next == 0)
+                    {
+                        sessionCommandPoint(0, 0);
+                    }
+                    if (ENABLE_REPLAY)
+                    {
+                        _commands.Add(next, m);
+                    }
+                    if (_autoSync)
+                    {
+                        m.Sync = true;
+                    }
+                    _needSync = ! m.Sync;
+                    _channel.method(m);
+                    if (_autoSync)
+                    {
+                        sync();
+                    }
+
+                    // flush every 64K commands to avoid ambiguity on
+                    // wraparound
+                    if ((next%65536) == 0)
+                    {
+                        sessionFlush(Option.COMPLETED);
+                    }
+                }
+            }
+            else
+            {
+                _channel.method(m);
+            }
+        }
+
+        public void sync()
+        {
+            sync(_timeout);
+        }
+
+        public void sync(long timeout)
+        {
+            log.debug("{0} sync()", this);
+            lock (_commands)
+            {
+                int point = _commandsOut - 1;
+
+                if (_needSync && Serial.lt(_maxComplete, point))
+                {
+                    executionSync(Option.SYNC);
+                }
+
+                DateTime start = DateTime.Now;
+                long elapsed = 0;
+
+                while (! Closed && elapsed < timeout && Serial.lt(_maxComplete, point))
+                {
+                    log.debug("{0}   waiting for[{1}]: {2}, {3}", this, point,
+                              _maxComplete, _commands);
+                    Monitor.Wait(_commands, (int) (timeout - elapsed));
+                    elapsed = DateTime.Now.Subtract(start).Milliseconds;
+                }
+
+                if (Serial.lt(_maxComplete, point))
+                {
+                    if (Closed)
+                    {
+                        throw new SessionException(getExceptions());
+                    }
+                    else
+                    {
+                        throw new Exception
+                            (String.Format
+                                 ("timed out waiting for sync: complete = {0}, point = {1}", _maxComplete, point));
+                    }
+                }
+            }
+        }
+
+
+        public void result(int command, Struct result)
+        {
+            Future future;
+            lock (_results)
+            {
+                if (_results.ContainsKey(command))
+                {
+                    future = _results[command];
+                    _results.Remove(command);
+                }
+                else
+                {
+                    throw new Exception(String.Format("Cannot ger result {0} for {1}", command, result));
+                }
+            }
+            future.Result = result;
+        }
+
+        public void addException(ExecutionException exc)
+        {
+            lock (_exceptions)
+            {
+                _exceptions.Add(exc);
+            }
+        }
+
+        private ConnectionClose _close = null;
+
+        public void closeCode(ConnectionClose close)
+        {
+            _close = close;
+        }
+
+        public List<ExecutionException> getExceptions()
+        {
+            lock (_exceptions)
+            {
+                return new List<ExecutionException>(_exceptions);
+            }
+        }
+
+        public override Future invoke(Method m, Future future)     
+        {
+            lock (_commands)
+            {
+                future.Session = this;
+                int command = _commandsOut;
+                lock (_results)
+                {
+                    _results.Add(command, future);
+                }
+                invoke(m);
+            }
+            return future;
+        }
+
+
+        public void messageTransfer(String destination,
+                                    MessageAcceptMode acceptMode,
+                                    MessageAcquireMode acquireMode,
+                                    Header header,
+                                    byte[] body,
+                                    params Option[] options)
+        {
+            MemoryStream mbody = new MemoryStream();
+            mbody.Write(body,0, body.Length);
+            messageTransfer(destination, acceptMode, acquireMode, header,
+                            mbody, options);
+        }
+
+        public void messageTransfer(String destination,
+                                    MessageAcceptMode acceptMode,
+                                    MessageAcquireMode acquireMode,
+                                    Header header,
+                                    String body,
+                                    params Option[] options)
+        {
+            messageTransfer(destination, acceptMode, acquireMode, header,
+                            new MemoryStream(Convert.ToByte(body)), options);
+        }
+
+        public void close()
+        {
+            sessionRequestTimeout(0);
+            sessionDetach(_name);
+            lock (_commands)
+            {
+                DateTime start = DateTime.Now;
+                long elapsed = 0;
+
+                while (! Closed && elapsed < _timeout)
+                {
+                    Monitor.Wait(_commands, (int) (_timeout - elapsed));
+                    elapsed = DateTime.Now.Subtract(start).Milliseconds;
+                }
+            }
+        }
+
+        public void exception(Exception t)
+        {
+            log.error(t, "Caught exception");
+        }
+
+        public void closed()
+        {
+            Closed = true;
+            lock (_commands)
+            {
+                Monitor.PulseAll(_commands);
+            }
+            lock (_results)
+            {
+                foreach (Future result in _results.Values)
+                {
+                    lock (result)
+                    {
+                        Monitor.PulseAll(result);
+                    }
+                }
+            }
+            _channel.Session = null;
+            _channel = null;
+        }
+
+        public String toString()
+        {
+            return String.Format("session:{0}", _name);
+        }
+    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/SessionDelegate.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/SessionDelegate.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/SessionDelegate.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/SessionDelegate.cs Thu Dec  3 22:03:51 2009
@@ -1,126 +1,126 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-
-namespace org.apache.qpid.transport
-{
-    /// <summary> 
-    /// SessionDelegate
-    /// 
-    /// </summary>
-    public abstract class SessionDelegate : MethodDelegate<Session>, ProtocolDelegate<Session>
-    {
-        public void Init(Session ssn, ProtocolHeader hdr)
-        {
-        }
-
-        public void Control(Session ssn, Method method)
-        {
-            method.dispatch(ssn, this);
-        }
-
-        public void Command(Session ssn, Method method)
-        {
-            ssn.identify(method);
-            method.dispatch(ssn, this);
-            if (!method.hasPayload())
-            {
-                ssn.processed(method);
-            }
-        }
-
-        public void Error(Session ssn, ProtocolError error)
-        {
-        }
-
-        public override void executionResult(Session ssn, ExecutionResult result)
-        {
-            ssn.result(result.getCommandId(), result.getValue());
-        }
-
-        public override void executionException(Session ssn, ExecutionException exc)
-        {
-            ssn.addException(exc);
-        }
-
-        public override void sessionCompleted(Session ssn, SessionCompleted cmp)
-        {           
-                RangeSet ranges = cmp.getCommands();
-                RangeSet known = null;
-                if (cmp.getTimelyReply())
-                {
-                    known = new RangeSet();
-                }
-
-                if (ranges != null)
-                {
-                    foreach (Range range in ranges)
-                    {
-                        bool advanced = ssn.complete(range.Lower, range.Upper);
-                        if (advanced && known != null)
-                        {
-                            known.add(range);
-                        }
-                    }
-                }
-
-                if (known != null)
-                {
-                    ssn.sessionKnownCompleted(known);
-                }           
-        }
-
-        public override void sessionKnownCompleted(Session ssn, SessionKnownCompleted kcmp)
-        {
-            RangeSet kc = kcmp.getCommands();
-            if (kc != null)
-            {
-                ssn.knownComplete(kc);
-            }
-        }
-
-        public override void sessionFlush(Session ssn, SessionFlush flush)
-        {
-            if (flush.getCompleted())
-            {
-                ssn.flushProcessed();
-            }
-            if (flush.getConfirmed())
-            {
-                ssn.flushProcessed();
-            }
-            if (flush.getExpected())
-            {
-               // to be done
-                //throw new Exception("not implemented");
-            }
-        }
-
-        public override void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
-        {
-            ssn.CommandsIn = scp.getCommandId();
-        }
-
-        public override void executionSync(Session ssn, ExecutionSync sync)
-        {
-            ssn.syncPoint();
-        }
-    }
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+namespace org.apache.qpid.transport
+{
+    /// <summary> 
+    /// SessionDelegate
+    /// 
+    /// </summary>
+    public abstract class SessionDelegate : MethodDelegate<Session>, ProtocolDelegate<Session>
+    {
+        public void Init(Session ssn, ProtocolHeader hdr)
+        {
+        }
+
+        public void Control(Session ssn, Method method)
+        {
+            method.dispatch(ssn, this);
+        }
+
+        public void Command(Session ssn, Method method)
+        {
+            ssn.identify(method);
+            method.dispatch(ssn, this);
+            if (!method.hasPayload())
+            {
+                ssn.processed(method);
+            }
+        }
+
+        public void Error(Session ssn, ProtocolError error)
+        {
+        }
+
+        public override void executionResult(Session ssn, ExecutionResult result)
+        {
+            ssn.result(result.getCommandId(), result.getValue());
+        }
+
+        public override void executionException(Session ssn, ExecutionException exc)
+        {
+            ssn.addException(exc);
+        }
+
+        public override void sessionCompleted(Session ssn, SessionCompleted cmp)
+        {           
+                RangeSet ranges = cmp.getCommands();
+                RangeSet known = null;
+                if (cmp.getTimelyReply())
+                {
+                    known = new RangeSet();
+                }
+
+                if (ranges != null)
+                {
+                    foreach (Range range in ranges)
+                    {
+                        bool advanced = ssn.complete(range.Lower, range.Upper);
+                        if (advanced && known != null)
+                        {
+                            known.add(range);
+                        }
+                    }
+                }
+
+                if (known != null)
+                {
+                    ssn.sessionKnownCompleted(known);
+                }           
+        }
+
+        public override void sessionKnownCompleted(Session ssn, SessionKnownCompleted kcmp)
+        {
+            RangeSet kc = kcmp.getCommands();
+            if (kc != null)
+            {
+                ssn.knownComplete(kc);
+            }
+        }
+
+        public override void sessionFlush(Session ssn, SessionFlush flush)
+        {
+            if (flush.getCompleted())
+            {
+                ssn.flushProcessed();
+            }
+            if (flush.getConfirmed())
+            {
+                ssn.flushProcessed();
+            }
+            if (flush.getExpected())
+            {
+               // to be done
+                //throw new Exception("not implemented");
+            }
+        }
+
+        public override void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
+        {
+            ssn.CommandsIn = scp.getCommandId();
+        }
+
+        public override void executionSync(Session ssn, ExecutionSync sync)
+        {
+            ssn.syncPoint();
+        }
+    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/Struct.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/Struct.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/Struct.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/Struct.cs Thu Dec  3 22:03:51 2009
@@ -1,122 +1,122 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-using System;
-using System.Collections.Generic;
-using System.Text;
-using Decoder = org.apache.qpid.transport.codec.Decoder;
-using Encodable = org.apache.qpid.transport.codec.Encodable;
-using Encoder = org.apache.qpid.transport.codec.Encoder;
-namespace org.apache.qpid.transport
-{
-	/// <summary> 
-	/// Struct
-	/// </summary>
-
-    public abstract class Struct : Encodable
-    {
-        public  static Struct create(int type)
-        {
-            return StructFactory.create(type);
-        }
-
-        bool dirty = true;
-
-        public bool Dirty
-        {
-            get { return dirty; }
-            set { dirty = value; }
-        }
-
-        public abstract int getStructType();
-
-        public abstract int getSizeWidth();
-
-        public abstract int getPackWidth();
-
-        public int getEncodedType()
-        {
-            int type = getStructType();
-            if (type < 0)
-            {
-                throw new Exception();
-            }
-            return type;
-        }
-
-        private bool isBit<C, T>(Field<C, T> f)
-        {
-            return Equals(f.Type, typeof(Boolean));
-        }
-
-        private bool packed()
-        {
-            return getPackWidth() > 0;
-        }
-
-        private bool encoded<C, T>(Field<C, T> f)
-        {
-            return !packed() || !isBit(f) && f.has(this);
-        }
-
-        private int getFlagWidth()
-        {
-            return (Fields.Count + 7) / 8;
-        }
-
-        private int getFlagCount()
-        {
-            return 8 * getPackWidth();
-        }
-
-        public abstract void read(Decoder dec);
-
-        public abstract void write(Encoder enc);
-
-        public abstract Dictionary<String, Object> Fields
-        {
-            get;
-        }
-
-        public String toString()
-        {
-            StringBuilder str = new StringBuilder();
-            str.Append(GetType());
-            str.Append("(");
-            bool first = true;
-            foreach (KeyValuePair<String, Object> me in Fields)
-            {
-                if (first)
-                {
-                    first = false;
-                }
-                else
-                {
-                    str.Append(", ");
-                }
-                str.Append(me.Key);
-                str.Append("=");
-                str.Append(me.Value);
-            }
-            str.Append(")");
-            return str.ToString();
-        }
-    }
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Decoder = org.apache.qpid.transport.codec.Decoder;
+using Encodable = org.apache.qpid.transport.codec.Encodable;
+using Encoder = org.apache.qpid.transport.codec.Encoder;
+namespace org.apache.qpid.transport
+{
+	/// <summary> 
+	/// Struct
+	/// </summary>
+
+    public abstract class Struct : Encodable
+    {
+        public  static Struct create(int type)
+        {
+            return StructFactory.create(type);
+        }
+
+        bool dirty = true;
+
+        public bool Dirty
+        {
+            get { return dirty; }
+            set { dirty = value; }
+        }
+
+        public abstract int getStructType();
+
+        public abstract int getSizeWidth();
+
+        public abstract int getPackWidth();
+
+        public int getEncodedType()
+        {
+            int type = getStructType();
+            if (type < 0)
+            {
+                throw new Exception();
+            }
+            return type;
+        }
+
+        private bool isBit<C, T>(Field<C, T> f)
+        {
+            return Equals(f.Type, typeof(Boolean));
+        }
+
+        private bool packed()
+        {
+            return getPackWidth() > 0;
+        }
+
+        private bool encoded<C, T>(Field<C, T> f)
+        {
+            return !packed() || !isBit(f) && f.has(this);
+        }
+
+        private int getFlagWidth()
+        {
+            return (Fields.Count + 7) / 8;
+        }
+
+        private int getFlagCount()
+        {
+            return 8 * getPackWidth();
+        }
+
+        public abstract void read(Decoder dec);
+
+        public abstract void write(Encoder enc);
+
+        public abstract Dictionary<String, Object> Fields
+        {
+            get;
+        }
+
+        public String toString()
+        {
+            StringBuilder str = new StringBuilder();
+            str.Append(GetType());
+            str.Append("(");
+            bool first = true;
+            foreach (KeyValuePair<String, Object> me in Fields)
+            {
+                if (first)
+                {
+                    first = false;
+                }
+                else
+                {
+                    str.Append(", ");
+                }
+                str.Append(me.Key);
+                str.Append("=");
+                str.Append(me.Value);
+            }
+            str.Append(")");
+            return str.ToString();
+        }
+    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/dotnet/client-010/client/transport/codec/AbstractDecoder.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/client-010/client/transport/codec/AbstractDecoder.cs?rev=886940&r1=886939&r2=886940&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/client-010/client/transport/codec/AbstractDecoder.cs (original)
+++ qpid/trunk/qpid/dotnet/client-010/client/transport/codec/AbstractDecoder.cs Thu Dec  3 22:03:51 2009
@@ -1,399 +1,399 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Text;
-using org.apache.qpid.transport.util;
-
-namespace org.apache.qpid.transport.codec
-{
-    /// <summary> 
-    /// AbstractDecoder
-    /// </summary>
-    public abstract class AbstractDecoder : Decoder
-    {
-        private readonly Dictionary<Binary, String> str8cache = new Dictionary<Binary, String>();
-
-        protected abstract byte doGet();
-
-        protected abstract void doGet(byte[] bytes);
-        public abstract bool hasRemaining();
-
-        protected byte get()
-        {
-            return doGet();
-        }
-
-        protected void get(byte[] bytes)
-        {
-            doGet(bytes);
-        }
-
-        protected Binary get(int size)
-        {
-            byte[] bytes = new byte[size];
-            get(bytes);
-            return new Binary(bytes);
-        }
-
-        protected short uget()
-        {
-            return (short) (0xFF & get());
-        }
-
-        public virtual short readUint8()
-        {
-            return uget();
-        }
-
-        public abstract int readUint16();
-       
-
-        public abstract long readUint32();
-      
-
-        public int readSequenceNo()
-        {
-            return (int) readUint32();
-        }
-
-        public virtual long readUint64()
-        {
-            long l = 0;
-            for (int i = 0; i < 8; i++)
-            {
-                l |= ((long) (0xFF & get())) << (56 - i*8);
-            }
-            return l;
-        }
-
-        public abstract short readInt8();
-        public abstract int readInt16();       
-        public abstract long readInt32() ;
-        public abstract long readInt64();     
-        public abstract float readFloat() ;  
-        public abstract double readDouble() ;          
-
-        public long readDatetime()
-        {
-            return readUint64();
-        }
-
-        private static String decode(byte[] bytes, int offset, int length, Encoding encoding)
-        {
-            return encoding.GetString(bytes, offset, length);
-        }
-
-        private static String decode(byte[] bytes, Encoding encoding)
-        {
-            return decode(bytes, 0, bytes.Length, encoding);
-        }
-
-        public String readStr8()
-        {
-            short size = readUint8();
-            Binary bin = get(size);
-            String str;
-            if (! str8cache.TryGetValue(bin, out str))
-            {
-                str = decode(bin.array(), bin.offset(), bin.size(), Encoding.UTF8);
-                str8cache.Add(bin, str);
-            }
-            return str;
-        }
-
-        public String readStr16()
-        {
-            int size = readUint16();
-            byte[] bytes = new byte[size];
-            get(bytes);
-            return decode(bytes, Encoding.UTF8);
-        }
-
-        public byte[] readVbin8()
-        {
-            int size = readUint8();
-            byte[] bytes = new byte[size];
-            get(bytes);
-            return bytes;
-        }
-
-        public byte[] readVbin16()
-        {
-            int size = readUint16();
-            byte[] bytes = new byte[size];
-            get(bytes);
-            return bytes;
-        }
-
-        public byte[] readVbin32()
-        {
-            int size = (int) readUint32();
-            byte[] bytes = new byte[size];
-            get(bytes);
-            return bytes;
-        }
-
-        public RangeSet readSequenceSet()
-        {
-            int count = readUint16()/8;
-            if (count == 0)
-            {
-                return null;
-            }
-            RangeSet ranges = new RangeSet();
-            for (int i = 0; i < count; i++)
-            {
-                ranges.add(readSequenceNo(), readSequenceNo());
-            }
-            return ranges;
-        }
-
-        public RangeSet readByteRanges()
-        {
-            throw new Exception("not implemented");
-        }
-
-        public UUID readUuid()
-        {
-            long msb = readUint64();
-            long lsb = readUint64();
-            return new UUID(msb, lsb);
-        }
-
-        public String readContent()
-        {
-            throw new Exception("Deprecated");
-        }
-
-        public Struct readStruct(int type)
-        {
-            Struct st = Struct.create(type);
-            int width = st.getSizeWidth();
-            if (width > 0)
-            {
-                long size = readSize(width);
-                if (size == 0)
-                {
-                    return null;
-                }
-            }
-            if (type > 0)
-            {
-                int code = readUint16();
-                Debug.Assert(code == type);
-            }
-            st.read(this);
-            return st;
-        }
-
-        public Struct readStruct32()
-        {
-            long size = readUint32();
-            if (size == 0)
-            {
-                return null;
-            }
-            int type = readUint16();
-            Struct result = Struct.create(type);
-            result.read(this);
-            return result;
-        }
-
-        public Dictionary<String, Object> readMap()
-        {
-            long size = readUint32();
-
-            if (size == 0)
-            {
-                return null;
-            }
-
-            long count = readUint32();
-
-            Dictionary<String, Object> result = new Dictionary<String, Object>();
-            for (int i = 0; i < count; i++)
-            {
-                String key = readStr8();
-                byte code = get();
-                QpidType t = getType(code);
-                Object value = read(t);
-                result.Add(key, value);
-            }
-
-            return result;
-        }
-
-        public List<Object> readList()
-        {
-            long size = readUint32();
-
-            if (size == 0)
-            {
-                return null;
-            }
-
-            long count = readUint32();
-
-            List<Object> result = new List<Object>();
-            for (int i = 0; i < count; i++)
-            {
-                byte code = get();
-                QpidType t = getType(code);
-                Object value = read(t);
-                result.Add(value);
-            }
-            return result;
-        }
-
-        public List<Object> readArray()
-        {
-            long size = readUint32();
-
-            if (size == 0)
-            {
-                return null;
-            }
-
-            byte code = get();
-            QpidType t = getType(code);
-            long count = readUint32();
-
-            List<Object> result = new List<Object>();
-            for (int i = 0; i < count; i++)
-            {
-                Object value = read(t);
-                result.Add(value);
-            }
-            return result;
-        }
-
-        private QpidType getType(byte code)
-        {
-            return QpidType.get(code);
-        }
-
-        private long readSize(QpidType t)
-        {
-            return t.Fixed ? t.Width : readSize(t.Width);
-        }
-
-        private long readSize(int width)
-        {
-            switch (width)
-            {
-                case 1:
-                    return readUint8();
-                case 2:
-                    return readUint16();
-                case 4:
-                    return readUint32();
-                default:
-                    throw new Exception("illegal width: " + width);
-            }
-        }
-
-        private byte[] readBytes(QpidType t)
-        {
-            long size = readSize(t);
-            byte[] result = new byte[(int) size];
-            get(result);
-            return result;
-        }
-
-        private Object read(QpidType t)
-        {
-            switch (t.Code)
-            {
-                case Code.BIN8:
-                case Code.UINT8:
-                    return readUint8();
-                case Code.INT8:
-                    return get();
-                case Code.CHAR:
-                    return (char) get();
-                case Code.BOOLEAN:
-                    return get() > 0;
-
-                case Code.BIN16:
-                case Code.UINT16:
-                    return readUint16();
-                case Code.INT16:
-                    return (short) readUint16();
-
-                case Code.BIN32:
-                case Code.UINT32:
-                    return readUint32();
-
-                case Code.CHAR_UTF32:
-                case Code.INT32:
-                    return (int) readUint32();
-
-                case Code.FLOAT:                    
-                    return  (float)BitConverter.Int64BitsToDouble(readUint32() << 32);
-                           
-                case Code.BIN64:
-                case Code.UINT64:
-                case Code.INT64:
-                case Code.DATETIME:
-                    return readUint64();
-
-                case Code.DOUBLE:                   
-                    return BitConverter.Int64BitsToDouble(readUint64());
-                case Code.UUID:
-                    return readUuid();
-                case Code.STR8:
-                    return readStr8();
-                case Code.STR16:
-                    return readStr16();
-                case Code.STR8_LATIN:
-                case Code.STR8_UTF16:
-                case Code.STR16_LATIN:
-                case Code.STR16_UTF16:
-                    // XXX: need to do character conversion
-                    return Encoding.UTF8.GetString(readBytes(t));
-
-                case Code.MAP:
-                    return readMap();
-                case Code.LIST:
-                    return readList();
-                case Code.ARRAY:
-                    return readArray();
-                case Code.STRUCT32:
-                    return readStruct32();
-
-                case Code.BIN40:
-                case Code.DEC32:
-                case Code.BIN72:
-                case Code.DEC64:
-                    // XXX: what types are we supposed to use here?
-                    return readBytes(t);
-
-                case Code.VOID:
-                    return null;
-
-                default:
-                    return readBytes(t);
-            }
-        }
-    }
-}
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Text;
+using org.apache.qpid.transport.util;
+
+namespace org.apache.qpid.transport.codec
+{
+    /// <summary> 
+    /// AbstractDecoder
+    /// </summary>
+    public abstract class AbstractDecoder : Decoder
+    {
+        private readonly Dictionary<Binary, String> str8cache = new Dictionary<Binary, String>();
+
+        protected abstract byte doGet();
+
+        protected abstract void doGet(byte[] bytes);
+        public abstract bool hasRemaining();
+
+        protected byte get()
+        {
+            return doGet();
+        }
+
+        protected void get(byte[] bytes)
+        {
+            doGet(bytes);
+        }
+
+        protected Binary get(int size)
+        {
+            byte[] bytes = new byte[size];
+            get(bytes);
+            return new Binary(bytes);
+        }
+
+        protected short uget()
+        {
+            return (short) (0xFF & get());
+        }
+
+        public virtual short readUint8()
+        {
+            return uget();
+        }
+
+        public abstract int readUint16();
+       
+
+        public abstract long readUint32();
+      
+
+        public int readSequenceNo()
+        {
+            return (int) readUint32();
+        }
+
+        public virtual long readUint64()
+        {
+            long l = 0;
+            for (int i = 0; i < 8; i++)
+            {
+                l |= ((long) (0xFF & get())) << (56 - i*8);
+            }
+            return l;
+        }
+
+        public abstract short readInt8();
+        public abstract int readInt16();       
+        public abstract long readInt32() ;
+        public abstract long readInt64();     
+        public abstract float readFloat() ;  
+        public abstract double readDouble() ;          
+
+        public long readDatetime()
+        {
+            return readUint64();
+        }
+
+        private static String decode(byte[] bytes, int offset, int length, Encoding encoding)
+        {
+            return encoding.GetString(bytes, offset, length);
+        }
+
+        private static String decode(byte[] bytes, Encoding encoding)
+        {
+            return decode(bytes, 0, bytes.Length, encoding);
+        }
+
+        public String readStr8()
+        {
+            short size = readUint8();
+            Binary bin = get(size);
+            String str;
+            if (! str8cache.TryGetValue(bin, out str))
+            {
+                str = decode(bin.array(), bin.offset(), bin.size(), Encoding.UTF8);
+                str8cache.Add(bin, str);
+            }
+            return str;
+        }
+
+        public String readStr16()
+        {
+            int size = readUint16();
+            byte[] bytes = new byte[size];
+            get(bytes);
+            return decode(bytes, Encoding.UTF8);
+        }
+
+        public byte[] readVbin8()
+        {
+            int size = readUint8();
+            byte[] bytes = new byte[size];
+            get(bytes);
+            return bytes;
+        }
+
+        public byte[] readVbin16()
+        {
+            int size = readUint16();
+            byte[] bytes = new byte[size];
+            get(bytes);
+            return bytes;
+        }
+
+        public byte[] readVbin32()
+        {
+            int size = (int) readUint32();
+            byte[] bytes = new byte[size];
+            get(bytes);
+            return bytes;
+        }
+
+        public RangeSet readSequenceSet()
+        {
+            int count = readUint16()/8;
+            if (count == 0)
+            {
+                return null;
+            }
+            RangeSet ranges = new RangeSet();
+            for (int i = 0; i < count; i++)
+            {
+                ranges.add(readSequenceNo(), readSequenceNo());
+            }
+            return ranges;
+        }
+
+        public RangeSet readByteRanges()
+        {
+            throw new Exception("not implemented");
+        }
+
+        public UUID readUuid()
+        {
+            long msb = readUint64();
+            long lsb = readUint64();
+            return new UUID(msb, lsb);
+        }
+
+        public String readContent()
+        {
+            throw new Exception("Deprecated");
+        }
+
+        public Struct readStruct(int type)
+        {
+            Struct st = Struct.create(type);
+            int width = st.getSizeWidth();
+            if (width > 0)
+            {
+                long size = readSize(width);
+                if (size == 0)
+                {
+                    return null;
+                }
+            }
+            if (type > 0)
+            {
+                int code = readUint16();
+                Debug.Assert(code == type);
+            }
+            st.read(this);
+            return st;
+        }
+
+        public Struct readStruct32()
+        {
+            long size = readUint32();
+            if (size == 0)
+            {
+                return null;
+            }
+            int type = readUint16();
+            Struct result = Struct.create(type);
+            result.read(this);
+            return result;
+        }
+
+        public Dictionary<String, Object> readMap()
+        {
+            long size = readUint32();
+
+            if (size == 0)
+            {
+                return null;
+            }
+
+            long count = readUint32();
+
+            Dictionary<String, Object> result = new Dictionary<String, Object>();
+            for (int i = 0; i < count; i++)
+            {
+                String key = readStr8();
+                byte code = get();
+                QpidType t = getType(code);
+                Object value = read(t);
+                result.Add(key, value);
+            }
+
+            return result;
+        }
+
+        public List<Object> readList()
+        {
+            long size = readUint32();
+
+            if (size == 0)
+            {
+                return null;
+            }
+
+            long count = readUint32();
+
+            List<Object> result = new List<Object>();
+            for (int i = 0; i < count; i++)
+            {
+                byte code = get();
+                QpidType t = getType(code);
+                Object value = read(t);
+                result.Add(value);
+            }
+            return result;
+        }
+
+        public List<Object> readArray()
+        {
+            long size = readUint32();
+
+            if (size == 0)
+            {
+                return null;
+            }
+
+            byte code = get();
+            QpidType t = getType(code);
+            long count = readUint32();
+
+            List<Object> result = new List<Object>();
+            for (int i = 0; i < count; i++)
+            {
+                Object value = read(t);
+                result.Add(value);
+            }
+            return result;
+        }
+
+        private QpidType getType(byte code)
+        {
+            return QpidType.get(code);
+        }
+
+        private long readSize(QpidType t)
+        {
+            return t.Fixed ? t.Width : readSize(t.Width);
+        }
+
+        private long readSize(int width)
+        {
+            switch (width)
+            {
+                case 1:
+                    return readUint8();
+                case 2:
+                    return readUint16();
+                case 4:
+                    return readUint32();
+                default:
+                    throw new Exception("illegal width: " + width);
+            }
+        }
+
+        private byte[] readBytes(QpidType t)
+        {
+            long size = readSize(t);
+            byte[] result = new byte[(int) size];
+            get(result);
+            return result;
+        }
+
+        private Object read(QpidType t)
+        {
+            switch (t.Code)
+            {
+                case Code.BIN8:
+                case Code.UINT8:
+                    return readUint8();
+                case Code.INT8:
+                    return get();
+                case Code.CHAR:
+                    return (char) get();
+                case Code.BOOLEAN:
+                    return get() > 0;
+
+                case Code.BIN16:
+                case Code.UINT16:
+                    return readUint16();
+                case Code.INT16:
+                    return (short) readUint16();
+
+                case Code.BIN32:
+                case Code.UINT32:
+                    return readUint32();
+
+                case Code.CHAR_UTF32:
+                case Code.INT32:
+                    return (int) readUint32();
+
+                case Code.FLOAT:                    
+                    return  (float)BitConverter.Int64BitsToDouble(readUint32() << 32);
+                           
+                case Code.BIN64:
+                case Code.UINT64:
+                case Code.INT64:
+                case Code.DATETIME:
+                    return readUint64();
+
+                case Code.DOUBLE:                   
+                    return BitConverter.Int64BitsToDouble(readUint64());
+                case Code.UUID:
+                    return readUuid();
+                case Code.STR8:
+                    return readStr8();
+                case Code.STR16:
+                    return readStr16();
+                case Code.STR8_LATIN:
+                case Code.STR8_UTF16:
+                case Code.STR16_LATIN:
+                case Code.STR16_UTF16:
+                    // XXX: need to do character conversion
+                    return Encoding.UTF8.GetString(readBytes(t));
+
+                case Code.MAP:
+                    return readMap();
+                case Code.LIST:
+                    return readList();
+                case Code.ARRAY:
+                    return readArray();
+                case Code.STRUCT32:
+                    return readStruct32();
+
+                case Code.BIN40:
+                case Code.DEC32:
+                case Code.BIN72:
+                case Code.DEC64:
+                    // XXX: what types are we supposed to use here?
+                    return readBytes(t);
+
+                case Code.VOID:
+                    return null;
+
+                default:
+                    return readBytes(t);
+            }
+        }
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message