qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1457505 [8/14] - in /qpid/trunk/qpid/java: amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/ amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/c...
Date Sun, 17 Mar 2013 18:03:43 GMT
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java Sun Mar 17 18:03:37 2013
@@ -1,580 +1,580 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.framing;
-
-import org.apache.qpid.amqp_1_0.codec.FrameWriter;
-import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
-import org.apache.qpid.amqp_1_0.codec.ProtocolHeaderHandler;
-import org.apache.qpid.amqp_1_0.codec.ValueHandler;
-import org.apache.qpid.amqp_1_0.codec.ValueWriter;
-import org.apache.qpid.amqp_1_0.transport.BytesProcessor;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-
-import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.transport.Open;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedShort;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class ConnectionHandler
-{
-    private final ConnectionEndpoint _connection;
-    private ProtocolHandler _delegate;
-
-    private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
-    private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
-
-    public ConnectionHandler(final ConnectionEndpoint connection)
-    {
-        _connection = connection;
-        _delegate = new ProtocolHeaderHandler(connection);
-    }
-
-
-    public boolean parse(ByteBuffer in)
-    {
-
-        while(in.hasRemaining() && !isDone())
-        {
-            _delegate = _delegate.parse(in);
-
-        }
-        return isDone();
-    }
-
-    public boolean isDone()
-    {
-        return _delegate.isDone();
-    }
-
-
-    // ----------------------------------------------------------------
-
-    public static class FrameOutput<T> implements FrameOutputHandler<T>, FrameSource
-    {
-
-        private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.wrap(new byte[0]);
-        private final BlockingQueue<AMQFrame<T>> _queue = new ArrayBlockingQueue<AMQFrame<T>>(100);
-        private ConnectionEndpoint _conn;
-
-        private final AMQFrame<T> _endOfFrameMarker = new AMQFrame<T>(null)
-        {
-            @Override public short getChannel()
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override public byte getFrameType()
-            {
-                throw new UnsupportedOperationException();
-            }
-        };
-
-        private boolean _setForClose;
-        private boolean _closed;
-        private long _nextHeartbeat;
-
-        public FrameOutput(final ConnectionEndpoint conn)
-        {
-            _conn = conn;
-        }
-
-        public boolean canSend()
-        {
-            return _queue.remainingCapacity() != 0;
-        }
-
-        public void send(AMQFrame<T> frame)
-        {
-            send(frame, null);
-        }
-
-        public void send(final AMQFrame<T> frame, final ByteBuffer payload)
-        {
-            synchronized(_conn.getLock())
-            {
-                try
-                {
-// TODO HACK - check frame length
-                    int size = _conn.getDescribedTypeRegistry()
-                            .getValueWriter(frame.getFrameBody()).writeToBuffer(EMPTY_BYTEBUFFER) + 8;
-
-                    if(size > _conn.getMaxFrameSize())
-                    {
-                        throw new OversizeFrameException(frame, size);
-                    }
-
-                    while(!_queue.offer(frame))
-                    {
-                        _conn.getLock().wait(1000L);
-
-                    }
-                    _conn.getLock().notifyAll();
-                }
-                catch (InterruptedException e)
-                {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-            }
-        }
-
-
-        public void close()
-        {
-            synchronized (_conn.getLock())
-            {
-                if(!_queue.offer(_endOfFrameMarker))
-                {
-                    _setForClose = true;
-                }
-                _conn.getLock().notifyAll();
-            }
-        }
-
-        public AMQFrame<T> getNextFrame(final boolean wait)
-        {
-            synchronized(_conn.getLock())
-            {
-                long time = System.currentTimeMillis();
-                try
-                {
-                    AMQFrame frame = null;
-                    while(!closed() && (frame = _queue.poll()) == null && wait)
-                    {
-                        _conn.getLock().wait(_conn.getIdleTimeout()/2);
-
-                        if(_conn.getIdleTimeout()>0)
-                        {
-                            time = System.currentTimeMillis();
-
-                            if(frame == null && time > _nextHeartbeat)
-                            {
-                                frame = new TransportFrame((short) 0,null);
-                                break;
-                            }
-                        }
-                    }
-
-
-
-
-                    if(frame != null)
-                    {
-                        _nextHeartbeat = time + _conn.getIdleTimeout()/2;
-
-                    }
-                    if(frame == _endOfFrameMarker)
-                    {
-                        _closed = true;
-                        frame = null;
-                    }
-                    else if(_setForClose && frame != null)
-                    {
-                        _setForClose = !_queue.offer(_endOfFrameMarker);
-                    }
-
-
-                    if(frame != null && FRAME_LOGGER.isLoggable(Level.FINE))
-                    {
-                        FRAME_LOGGER.fine("SEND[" + _conn.getRemoteAddress() + "|" + frame.getChannel() + "] : " + frame.getFrameBody());
-                    }
-
-                    _conn.getLock().notifyAll();
-
-                    return frame;
-                }
-                catch (InterruptedException e)
-                {
-                    _conn.setClosedForOutput(true);
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    return null;
-                }
-            }
-        }
-
-        public boolean closed()
-        {
-            return _closed;
-        }
-    }
-
-    public static interface FrameSource<T>
-    {
-        AMQFrame<T> getNextFrame(boolean wait);
-        boolean closed();
-    }
-
-
-    public static interface BytesSource
-    {
-        void getBytes(BytesProcessor processor, boolean wait);
-        boolean closed();
-    }
-
-    public static class FrameToBytesSourceAdapter implements BytesSource
-    {
-
-        private final FrameSource _frameSource;
-        private final FrameWriter _writer;
-        private static final int BUF_SIZE = 1<<16;
-        private final byte[] _bytes = new byte[BUF_SIZE];
-        private final ByteBuffer _buffer = ByteBuffer.wrap(_bytes);
-
-        public FrameToBytesSourceAdapter(final FrameSource frameSource, ValueWriter.Registry registry)
-        {
-            _frameSource = frameSource;
-            _writer =  new FrameWriter(registry);
-        }
-
-        public void getBytes(final BytesProcessor processor, final boolean wait)
-        {
-
-            AMQFrame frame;
-
-            if(_buffer.position() == 0 && !_frameSource.closed())
-            {
-                if(!_writer.isComplete())
-                {
-                    _writer.writeToBuffer(_buffer);
-                }
-
-                while(_buffer.hasRemaining())
-                {
-
-                    if((frame = _frameSource.getNextFrame(wait && _buffer.position()==0)) != null)
-                    {
-                        _writer.setValue(frame);
-
-                        try
-                        {
-                        _writer.writeToBuffer(_buffer);
-                        }
-                        catch(RuntimeException e)
-                        {
-                            e.printStackTrace();
-                            throw e;
-                        }
-                        catch(Error e)
-                        {
-                            e.printStackTrace();
-                            throw e;
-                        }
-
-
-                    }
-                    else
-                    {
-                        break;
-                    }
-                }
-                _buffer.flip();
-            }
-            if(_buffer.limit() != 0)
-            {
-                processor.processBytes(_buffer);
-                if(_buffer.remaining() == 0)
-                {
-                    _buffer.clear();
-                }
-            }
-        }
-
-        public boolean closed()
-        {
-            return _buffer.position() == 0 && _frameSource.closed();
-        }
-    }
-
-
-    public static class HeaderBytesSource implements BytesSource
-    {
-
-        private final ByteBuffer _buffer;
-        private ConnectionEndpoint _conn;
-
-        public HeaderBytesSource(ConnectionEndpoint conn, byte... headerBytes)
-        {
-            _conn = conn;
-            _buffer = ByteBuffer.wrap(headerBytes);
-        }
-
-        public void getBytes(final BytesProcessor processor, final boolean wait)
-        {
-            if(!_conn.closedForOutput())
-            {
-                processor.processBytes(_buffer);
-            }
-        }
-
-        public boolean closed()
-        {
-            return !_conn.closedForOutput() && !_buffer.hasRemaining();
-        }
-    }
-
-    public static class SequentialBytesSource implements BytesSource
-    {
-        private Queue<BytesSource> _sources = new LinkedList<BytesSource>();
-
-        public SequentialBytesSource(BytesSource... sources)
-        {
-            _sources.addAll(Arrays.asList(sources));
-        }
-
-        public synchronized void addSource(BytesSource source)
-        {
-            _sources.add(source);
-        }
-
-        public synchronized void getBytes(final BytesProcessor processor, final boolean wait)
-        {
-            BytesSource src = _sources.peek();
-            while (src != null && src.closed())
-            {
-                _sources.poll();
-                src = _sources.peek();
-            }
-
-            if(src != null)
-            {
-                src.getBytes(processor, wait);
-            }
-        }
-
-        public boolean closed()
-        {
-            return _sources.isEmpty();
-        }
-    }
-
-
-    public static class BytesOutputHandler implements Runnable, BytesProcessor
-    {
-
-        private final OutputStream _outputStream;
-        private BytesSource _bytesSource;
-        private boolean _closed;
-        private ConnectionEndpoint _conn;
-
-        public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn)
-            {
-                _outputStream = outputStream;
-                _bytesSource = source;
-                _conn = conn;
-            }
-
-            public void run()
-            {
-
-                final BytesSource bytesSource = _bytesSource;
-
-                while(!(_closed || bytesSource.closed()))
-                {
-                    _bytesSource.getBytes(this, true);
-                }
-
-            }
-
-        public void processBytes(final ByteBuffer buf)
-        {
-            try
-            {
-                if(RAW_LOGGER.isLoggable(Level.FINE))
-                {
-                    Binary bin = new Binary(buf.array(),buf.arrayOffset()+buf.position(), buf.limit()-buf.position());
-                    RAW_LOGGER.fine("SEND["+ _conn.getRemoteAddress() +"] : " + bin.toString());
-                }
-                _outputStream.write(buf.array(),buf.arrayOffset()+buf.position(), buf.limit()-buf.position());
-                buf.position(buf.limit());
-            }
-            catch (IOException e)
-            {
-                _closed = true;
-                e.printStackTrace();  //TODO
-            }
-        }
-    }
-
-
-    public static class OutputHandler implements Runnable
-    {
-
-
-
-        private final OutputStream _outputStream;
-        private FrameSource _frameSource;
-
-        private static final int BUF_SIZE = 1<<16;
-        private ValueWriter.Registry _registry;
-
-
-        public OutputHandler(OutputStream outputStream, FrameSource source, ValueWriter.Registry registry)
-        {
-            _outputStream = outputStream;
-            _frameSource = source;
-            _registry = registry;
-        }
-
-        public void run()
-        {
-            int i=0;
-
-
-            try
-            {
-
-                byte[] buffer = new byte[BUF_SIZE];
-                ByteBuffer buf = ByteBuffer.wrap(buffer);
-
-                buf.put((byte)'A');
-                buf.put((byte)'M');
-                buf.put((byte)'Q');
-                buf.put((byte)'P');
-                buf.put((byte) 0);
-                buf.put((byte) 1);
-                buf.put((byte) 0);
-                buf.put((byte) 0);
-
-
-
-                final FrameSource frameSource = _frameSource;
-
-                AMQFrame frame;
-                FrameWriter writer =  new FrameWriter(_registry);
-
-                while(!frameSource.closed())
-                {
-
-                    if(!writer.isComplete())
-                    {
-                        writer.writeToBuffer(buf);
-                    }
-
-                    while(buf.hasRemaining())
-                    {
-
-                        if((frame = frameSource.getNextFrame(buf.position()==0)) != null)
-                        {
-                            writer.setValue(frame);
-
-                            int size = writer.writeToBuffer(buf);
-
-                        }
-                        else
-                        {
-                            break;
-                        }
-                    }
-
-                    if(buf.limit() != 0)
-                    {
-                        _outputStream.write(buffer,0, buf.position());
-                        buf.clear();
-                    }
-                }
-
-            }
-            catch (IOException e)
-            {
-                e.printStackTrace();
-            }
-
-        }
-    }
-
-    public static void main(String[] args) throws AmqpErrorException
-    {
-        byte[] buffer = new byte[76];
-        ByteBuffer buf = ByteBuffer.wrap(buffer);
-        AMQPDescribedTypeRegistry registry = AMQPDescribedTypeRegistry.newInstance()
-                .registerTransportLayer()
-                .registerMessagingLayer()
-                .registerTransactionLayer();
-
-        Open open = new Open();
-        // Open(container_id="venture", channel_max=10, hostname="foo", offered_capabilities=[Symbol("one"), Symbol("two"), Symbol("three")])
-        open.setContainerId("venture");
-        open.setChannelMax(UnsignedShort.valueOf((short) 10));
-        open.setHostname("foo");
-        open.setOfferedCapabilities(new Symbol[] {Symbol.valueOf("one"),Symbol.valueOf("two"),Symbol.valueOf("three")});
-
-        ValueWriter<Open> writer = registry.getValueWriter(open);
-
-        System.out.println("------ Encode (time in ms for 1 million opens)");
-        Long myLong = Long.valueOf(32);
-        ValueWriter<Long> writer2 = registry.getValueWriter(myLong);
-        Double myDouble = Double.valueOf(3.14159265359);
-        ValueWriter<Double> writer3 = registry.getValueWriter(myDouble);
-        for(int n = 0; n < 1/*00*/; n++)
-        {
-            long startTime = System.currentTimeMillis();
-            for(int i = 1/*000000*/; i !=0; i--)
-            {
-                buf.position(0);
-                writer.setValue(open);
-                writer.writeToBuffer(buf);
-                writer2.setValue(myLong);
-                writer.writeToBuffer(buf);
-                writer3.setValue(myDouble);
-                writer3.writeToBuffer(buf);
-
-
-            }
-            long midTime = System.currentTimeMillis();
-            System.out.println((midTime - startTime));
-
-        }
-
-
-        ValueHandler handler = new ValueHandler(registry);
-        System.out.println("------ Decode (time in ms for 1 million opens)");
-        for(int n = 0; n < 100; n++)
-        {
-            long startTime = System.currentTimeMillis();
-            for(int i = 1000000; i !=0; i--)
-            {
-                buf.flip();
-                handler.parse(buf);
-                handler.parse(buf);
-                handler.parse(buf);
-
-            }
-            long midTime = System.currentTimeMillis();
-            System.out.println((midTime - startTime));
-        }
-
-
-    }
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.framing;
+
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
+import org.apache.qpid.amqp_1_0.codec.ProtocolHeaderHandler;
+import org.apache.qpid.amqp_1_0.codec.ValueHandler;
+import org.apache.qpid.amqp_1_0.codec.ValueWriter;
+import org.apache.qpid.amqp_1_0.transport.BytesProcessor;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+
+import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.transport.Open;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedShort;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ConnectionHandler
+{
+    private final ConnectionEndpoint _connection;
+    private ProtocolHandler _delegate;
+
+    private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
+    private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
+
+    public ConnectionHandler(final ConnectionEndpoint connection)
+    {
+        _connection = connection;
+        _delegate = new ProtocolHeaderHandler(connection);
+    }
+
+
+    public boolean parse(ByteBuffer in)
+    {
+
+        while(in.hasRemaining() && !isDone())
+        {
+            _delegate = _delegate.parse(in);
+
+        }
+        return isDone();
+    }
+
+    public boolean isDone()
+    {
+        return _delegate.isDone();
+    }
+
+
+    // ----------------------------------------------------------------
+
+    public static class FrameOutput<T> implements FrameOutputHandler<T>, FrameSource
+    {
+
+        private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.wrap(new byte[0]);
+        private final BlockingQueue<AMQFrame<T>> _queue = new ArrayBlockingQueue<AMQFrame<T>>(100);
+        private ConnectionEndpoint _conn;
+
+        private final AMQFrame<T> _endOfFrameMarker = new AMQFrame<T>(null)
+        {
+            @Override public short getChannel()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override public byte getFrameType()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+
+        private boolean _setForClose;
+        private boolean _closed;
+        private long _nextHeartbeat;
+
+        public FrameOutput(final ConnectionEndpoint conn)
+        {
+            _conn = conn;
+        }
+
+        public boolean canSend()
+        {
+            return _queue.remainingCapacity() != 0;
+        }
+
+        public void send(AMQFrame<T> frame)
+        {
+            send(frame, null);
+        }
+
+        public void send(final AMQFrame<T> frame, final ByteBuffer payload)
+        {
+            synchronized(_conn.getLock())
+            {
+                try
+                {
+// TODO HACK - check frame length
+                    int size = _conn.getDescribedTypeRegistry()
+                            .getValueWriter(frame.getFrameBody()).writeToBuffer(EMPTY_BYTEBUFFER) + 8;
+
+                    if(size > _conn.getMaxFrameSize())
+                    {
+                        throw new OversizeFrameException(frame, size);
+                    }
+
+                    while(!_queue.offer(frame))
+                    {
+                        _conn.getLock().wait(1000L);
+
+                    }
+                    _conn.getLock().notifyAll();
+                }
+                catch (InterruptedException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+        }
+
+
+        public void close()
+        {
+            synchronized (_conn.getLock())
+            {
+                if(!_queue.offer(_endOfFrameMarker))
+                {
+                    _setForClose = true;
+                }
+                _conn.getLock().notifyAll();
+            }
+        }
+
+        public AMQFrame<T> getNextFrame(final boolean wait)
+        {
+            synchronized(_conn.getLock())
+            {
+                long time = System.currentTimeMillis();
+                try
+                {
+                    AMQFrame frame = null;
+                    while(!closed() && (frame = _queue.poll()) == null && wait)
+                    {
+                        _conn.getLock().wait(_conn.getIdleTimeout()/2);
+
+                        if(_conn.getIdleTimeout()>0)
+                        {
+                            time = System.currentTimeMillis();
+
+                            if(frame == null && time > _nextHeartbeat)
+                            {
+                                frame = new TransportFrame((short) 0,null);
+                                break;
+                            }
+                        }
+                    }
+
+
+
+
+                    if(frame != null)
+                    {
+                        _nextHeartbeat = time + _conn.getIdleTimeout()/2;
+
+                    }
+                    if(frame == _endOfFrameMarker)
+                    {
+                        _closed = true;
+                        frame = null;
+                    }
+                    else if(_setForClose && frame != null)
+                    {
+                        _setForClose = !_queue.offer(_endOfFrameMarker);
+                    }
+
+
+                    if(frame != null && FRAME_LOGGER.isLoggable(Level.FINE))
+                    {
+                        FRAME_LOGGER.fine("SEND[" + _conn.getRemoteAddress() + "|" + frame.getChannel() + "] : " + frame.getFrameBody());
+                    }
+
+                    _conn.getLock().notifyAll();
+
+                    return frame;
+                }
+                catch (InterruptedException e)
+                {
+                    _conn.setClosedForOutput(true);
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    return null;
+                }
+            }
+        }
+
+        public boolean closed()
+        {
+            return _closed;
+        }
+    }
+
+    public static interface FrameSource<T>
+    {
+        AMQFrame<T> getNextFrame(boolean wait);
+        boolean closed();
+    }
+
+
+    public static interface BytesSource
+    {
+        void getBytes(BytesProcessor processor, boolean wait);
+        boolean closed();
+    }
+
+    public static class FrameToBytesSourceAdapter implements BytesSource
+    {
+
+        private final FrameSource _frameSource;
+        private final FrameWriter _writer;
+        private static final int BUF_SIZE = 1<<16;
+        private final byte[] _bytes = new byte[BUF_SIZE];
+        private final ByteBuffer _buffer = ByteBuffer.wrap(_bytes);
+
+        public FrameToBytesSourceAdapter(final FrameSource frameSource, ValueWriter.Registry registry)
+        {
+            _frameSource = frameSource;
+            _writer =  new FrameWriter(registry);
+        }
+
+        public void getBytes(final BytesProcessor processor, final boolean wait)
+        {
+
+            AMQFrame frame;
+
+            if(_buffer.position() == 0 && !_frameSource.closed())
+            {
+                if(!_writer.isComplete())
+                {
+                    _writer.writeToBuffer(_buffer);
+                }
+
+                while(_buffer.hasRemaining())
+                {
+
+                    if((frame = _frameSource.getNextFrame(wait && _buffer.position()==0)) != null)
+                    {
+                        _writer.setValue(frame);
+
+                        try
+                        {
+                        _writer.writeToBuffer(_buffer);
+                        }
+                        catch(RuntimeException e)
+                        {
+                            e.printStackTrace();
+                            throw e;
+                        }
+                        catch(Error e)
+                        {
+                            e.printStackTrace();
+                            throw e;
+                        }
+
+
+                    }
+                    else
+                    {
+                        break;
+                    }
+                }
+                _buffer.flip();
+            }
+            if(_buffer.limit() != 0)
+            {
+                processor.processBytes(_buffer);
+                if(_buffer.remaining() == 0)
+                {
+                    _buffer.clear();
+                }
+            }
+        }
+
+        public boolean closed()
+        {
+            return _buffer.position() == 0 && _frameSource.closed();
+        }
+    }
+
+
+    public static class HeaderBytesSource implements BytesSource
+    {
+
+        private final ByteBuffer _buffer;
+        private ConnectionEndpoint _conn;
+
+        public HeaderBytesSource(ConnectionEndpoint conn, byte... headerBytes)
+        {
+            _conn = conn;
+            _buffer = ByteBuffer.wrap(headerBytes);
+        }
+
+        public void getBytes(final BytesProcessor processor, final boolean wait)
+        {
+            if(!_conn.closedForOutput())
+            {
+                processor.processBytes(_buffer);
+            }
+        }
+
+        public boolean closed()
+        {
+            return !_conn.closedForOutput() && !_buffer.hasRemaining();
+        }
+    }
+
+    public static class SequentialBytesSource implements BytesSource
+    {
+        private Queue<BytesSource> _sources = new LinkedList<BytesSource>();
+
+        public SequentialBytesSource(BytesSource... sources)
+        {
+            _sources.addAll(Arrays.asList(sources));
+        }
+
+        public synchronized void addSource(BytesSource source)
+        {
+            _sources.add(source);
+        }
+
+        public synchronized void getBytes(final BytesProcessor processor, final boolean wait)
+        {
+            BytesSource src = _sources.peek();
+            while (src != null && src.closed())
+            {
+                _sources.poll();
+                src = _sources.peek();
+            }
+
+            if(src != null)
+            {
+                src.getBytes(processor, wait);
+            }
+        }
+
+        public boolean closed()
+        {
+            return _sources.isEmpty();
+        }
+    }
+
+
+    public static class BytesOutputHandler implements Runnable, BytesProcessor
+    {
+
+        private final OutputStream _outputStream;
+        private BytesSource _bytesSource;
+        private boolean _closed;
+        private ConnectionEndpoint _conn;
+
+        public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn)
+            {
+                _outputStream = outputStream;
+                _bytesSource = source;
+                _conn = conn;
+            }
+
+            public void run()
+            {
+
+                final BytesSource bytesSource = _bytesSource;
+
+                while(!(_closed || bytesSource.closed()))
+                {
+                    _bytesSource.getBytes(this, true);
+                }
+
+            }
+
+        public void processBytes(final ByteBuffer buf)
+        {
+            try
+            {
+                if(RAW_LOGGER.isLoggable(Level.FINE))
+                {
+                    Binary bin = new Binary(buf.array(),buf.arrayOffset()+buf.position(), buf.limit()-buf.position());
+                    RAW_LOGGER.fine("SEND["+ _conn.getRemoteAddress() +"] : " + bin.toString());
+                }
+                _outputStream.write(buf.array(),buf.arrayOffset()+buf.position(), buf.limit()-buf.position());
+                buf.position(buf.limit());
+            }
+            catch (IOException e)
+            {
+                _closed = true;
+                e.printStackTrace();  //TODO
+            }
+        }
+    }
+
+
+    public static class OutputHandler implements Runnable
+    {
+
+
+
+        private final OutputStream _outputStream;
+        private FrameSource _frameSource;
+
+        private static final int BUF_SIZE = 1<<16;
+        private ValueWriter.Registry _registry;
+
+
+        public OutputHandler(OutputStream outputStream, FrameSource source, ValueWriter.Registry registry)
+        {
+            _outputStream = outputStream;
+            _frameSource = source;
+            _registry = registry;
+        }
+
+        public void run()
+        {
+            int i=0;
+
+
+            try
+            {
+
+                byte[] buffer = new byte[BUF_SIZE];
+                ByteBuffer buf = ByteBuffer.wrap(buffer);
+
+                buf.put((byte)'A');
+                buf.put((byte)'M');
+                buf.put((byte)'Q');
+                buf.put((byte)'P');
+                buf.put((byte) 0);
+                buf.put((byte) 1);
+                buf.put((byte) 0);
+                buf.put((byte) 0);
+
+
+
+                final FrameSource frameSource = _frameSource;
+
+                AMQFrame frame;
+                FrameWriter writer =  new FrameWriter(_registry);
+
+                while(!frameSource.closed())
+                {
+
+                    if(!writer.isComplete())
+                    {
+                        writer.writeToBuffer(buf);
+                    }
+
+                    while(buf.hasRemaining())
+                    {
+
+                        if((frame = frameSource.getNextFrame(buf.position()==0)) != null)
+                        {
+                            writer.setValue(frame);
+
+                            int size = writer.writeToBuffer(buf);
+
+                        }
+                        else
+                        {
+                            break;
+                        }
+                    }
+
+                    if(buf.limit() != 0)
+                    {
+                        _outputStream.write(buffer,0, buf.position());
+                        buf.clear();
+                    }
+                }
+
+            }
+            catch (IOException e)
+            {
+                e.printStackTrace();
+            }
+
+        }
+    }
+
+    public static void main(String[] args) throws AmqpErrorException
+    {
+        byte[] buffer = new byte[76];
+        ByteBuffer buf = ByteBuffer.wrap(buffer);
+        AMQPDescribedTypeRegistry registry = AMQPDescribedTypeRegistry.newInstance()
+                .registerTransportLayer()
+                .registerMessagingLayer()
+                .registerTransactionLayer();
+
+        Open open = new Open();
+        // Open(container_id="venture", channel_max=10, hostname="foo", offered_capabilities=[Symbol("one"), Symbol("two"), Symbol("three")])
+        open.setContainerId("venture");
+        open.setChannelMax(UnsignedShort.valueOf((short) 10));
+        open.setHostname("foo");
+        open.setOfferedCapabilities(new Symbol[] {Symbol.valueOf("one"),Symbol.valueOf("two"),Symbol.valueOf("three")});
+
+        ValueWriter<Open> writer = registry.getValueWriter(open);
+
+        System.out.println("------ Encode (time in ms for 1 million opens)");
+        Long myLong = Long.valueOf(32);
+        ValueWriter<Long> writer2 = registry.getValueWriter(myLong);
+        Double myDouble = Double.valueOf(3.14159265359);
+        ValueWriter<Double> writer3 = registry.getValueWriter(myDouble);
+        for(int n = 0; n < 1/*00*/; n++)
+        {
+            long startTime = System.currentTimeMillis();
+            for(int i = 1/*000000*/; i !=0; i--)
+            {
+                buf.position(0);
+                writer.setValue(open);
+                writer.writeToBuffer(buf);
+                writer2.setValue(myLong);
+                writer.writeToBuffer(buf);
+                writer3.setValue(myDouble);
+                writer3.writeToBuffer(buf);
+
+
+            }
+            long midTime = System.currentTimeMillis();
+            System.out.println((midTime - startTime));
+
+        }
+
+
+        ValueHandler handler = new ValueHandler(registry);
+        System.out.println("------ Decode (time in ms for 1 million opens)");
+        for(int n = 0; n < 100; n++)
+        {
+            long startTime = System.currentTimeMillis();
+            for(int i = 1000000; i !=0; i--)
+            {
+                buf.flip();
+                handler.parse(buf);
+                handler.parse(buf);
+                handler.parse(buf);
+
+            }
+            long midTime = System.currentTimeMillis();
+            System.out.println((midTime - startTime));
+        }
+
+
+    }
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java Sun Mar 17 18:03:37 2013
@@ -1,329 +1,329 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.framing;
-
-import org.apache.qpid.amqp_1_0.codec.BinaryWriter;
-import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
-import org.apache.qpid.amqp_1_0.codec.ValueHandler;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.ErrorCondition;
-import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import java.nio.ByteBuffer;
-import java.util.Formatter;
-
-public class FrameHandler implements ProtocolHandler
-{
-    private ConnectionEndpoint _connection;
-    private ValueHandler _typeHandler;
-
-    enum State {
-        SIZE_0,
-        SIZE_1,
-        SIZE_2,
-        SIZE_3,
-        PRE_PARSE,
-        BUFFERING,
-        PARSING,
-        ERROR
-    }
-
-    private State _state = State.SIZE_0;
-    private int _size;
-
-    private ByteBuffer _buffer;
-
-
-
-    public FrameHandler(final ConnectionEndpoint connection)
-    {
-        _connection = connection;
-        _typeHandler = new ValueHandler(connection.getDescribedTypeRegistry());
-
-    }
-
-    public ProtocolHandler parse(ByteBuffer in)
-    {
-        try
-        {
-        Error frameParsingError = null;
-        int size = _size;
-        State state = _state;
-        ByteBuffer oldIn = null;
-
-        while(in.hasRemaining() && state != State.ERROR)
-        {
-
-            final int remaining = in.remaining();
-            if(remaining == 0)
-            {
-                return this;
-            }
-
-
-            switch(state)
-            {
-                case SIZE_0:
-                    if(remaining >= 4)
-                    {
-                        size = in.getInt();
-                        state = State.PRE_PARSE;
-                        break;
-                    }
-                    else
-                    {
-                        size = (in.get() << 24) & 0xFF000000;
-                        if(!in.hasRemaining())
-                        {
-                            state = State.SIZE_1;
-                            break;
-                        }
-                    }
-                case SIZE_1:
-                    size |= (in.get() << 16) & 0xFF0000;
-                    if(!in.hasRemaining())
-                    {
-                        state = State.SIZE_2;
-                        break;
-                    }
-                case SIZE_2:
-                    size |= (in.get() << 8) & 0xFF00;
-                    if(!in.hasRemaining())
-                    {
-                        state = State.SIZE_3;
-                        break;
-                    }
-                case SIZE_3:
-                    size |= in.get() & 0xFF;
-                    state = State.PRE_PARSE;
-
-                case PRE_PARSE:
-
-                    if(size < 8)
-                    {
-                        frameParsingError = createFramingError("specified frame size %d smaller than minimum frame header size %d", _size, 8);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    else if(size > _connection.getMaxFrameSize())
-                    {
-                        frameParsingError = createFramingError("specified frame size %d larger than maximum frame header size %d", size, _connection.getMaxFrameSize());
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    if(in.remaining() < size-4)
-                    {
-                        _buffer = ByteBuffer.allocate(size-4);
-                        _buffer.put(in);
-                        state = State.BUFFERING;
-                        break;
-                    }
-                case BUFFERING:
-                    if(_buffer != null)
-                    {
-                        if(in.remaining() < _buffer.remaining())
-                        {
-                            _buffer.put(in);
-                            break;
-                        }
-                        else
-                        {
-                            ByteBuffer dup = in.duplicate();
-                            dup.limit(dup.position()+_buffer.remaining());
-                            int i = _buffer.remaining();
-                            int d = dup.remaining();
-                            in.position(in.position()+_buffer.remaining());
-                            _buffer.put(dup);
-                            oldIn = in;
-                            _buffer.flip();
-                            in = _buffer;
-                            state = State.PARSING;
-                        }
-                    }
-
-                case PARSING:
-
-                    int dataOffset = (in.get() << 2) & 0x3FF;
-
-                    if(dataOffset < 8)
-                    {
-                        frameParsingError = createFramingError("specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8);
-                        state = State.ERROR;
-                        break;
-                    }
-                    else if(dataOffset > size)
-                    {
-                        frameParsingError = createFramingError("specified frame data offset %d larger than the frame size %d", dataOffset, _size);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    // type
-
-                    int type = in.get() & 0xFF;
-                    int channel = in.getShort() & 0xFF;
-
-                    if(type != 0 && type != 1)
-                    {
-                        frameParsingError = createFramingError("unknown frame type: %d", type);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    // channel
-
-                    /*if(channel > _connection.getChannelMax())
-                    {
-                        frameParsingError = createError(AmqpError.DECODE_ERROR,
-                                                        "frame received on invalid channel %d above channel-max %d",
-                                                        channel, _connection.getChannelMax());
-
-                        state = State.ERROR;
-                    }
-*/
-                    // ext header
-                    if(dataOffset!=8)
-                    {
-                        in.position(in.position()+dataOffset-8);
-                    }
-
-                    // oldIn null iff not working on duplicated buffer
-                    if(oldIn == null)
-                    {
-                        oldIn = in;
-                        in = in.duplicate();
-                        final int endPos = in.position() + size - dataOffset;
-                        in.limit(endPos);
-                        oldIn.position(endPos);
-
-                    }
-
-                    int inPos = in.position();
-                    int inLimit = in.limit();
-                    // PARSE HERE
-                    try
-                    {
-                        Object val = _typeHandler.parse(in);
-
-                        if(in.hasRemaining())
-                        {
-                            if(val instanceof Transfer)
-                            {
-                                ByteBuffer buf = ByteBuffer.allocate(in.remaining());
-                                buf.put(in);
-                                buf.flip();
-                                ((Transfer)val).setPayload(buf);
-                            }
-                        }
-
-                        _connection.receive((short)channel,val);
-                        reset();
-                        in = oldIn;
-                        oldIn = null;
-                        _buffer = null;
-                        state = State.SIZE_0;
-                        break;
-
-
-                    }
-                    catch (AmqpErrorException ex)
-                    {
-                        state = State.ERROR;
-                        frameParsingError = ex.getError();
-                    }
-                    catch (RuntimeException e)
-                    {
-                        in.position(inPos);
-                        in.limit(inLimit);
-                        System.err.println(toHex(in));
-                        throw e;
-                    }
-            }
-
-        }
-
-        _state = state;
-        _size = size;
-
-        if(_state == State.ERROR)
-        {
-            _connection.handleError(frameParsingError);
-        }
-        return this;
-        }
-        catch(RuntimeException e)
-        {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    private static String toHex(ByteBuffer in)
-    {
-        Formatter formatter = new Formatter();
-        int count = 0;
-        while(in.hasRemaining())
-        {
-            formatter.format("%02x ", in.get() & 0xFF);
-            if(count++ == 16)
-            {
-                formatter.format("\n");
-                count = 0;
-            }
-
-        }
-        return formatter.toString();
-    }
-
-    private Error createFramingError(String description, Object... args)
-    {
-        return createError(ConnectionError.FRAMING_ERROR, description, args);
-    }
-
-    private Error createError(final ErrorCondition framingError,
-                              final String description,
-                              final Object... args)
-    {
-        Error error = new Error();
-        error.setCondition(framingError);
-        Formatter formatter = new Formatter();
-        error.setDescription(formatter.format(description, args).toString());
-        return error;
-    }
-
-
-    private void reset()
-    {
-        _size = 0;
-        _state = State.SIZE_0;
-    }
-
-
-    public boolean isDone()
-    {
-        return _state == State.ERROR || _connection.closedForInput();
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.framing;
+
+import org.apache.qpid.amqp_1_0.codec.BinaryWriter;
+import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
+import org.apache.qpid.amqp_1_0.codec.ValueHandler;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.ErrorCondition;
+import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import java.nio.ByteBuffer;
+import java.util.Formatter;
+
+public class FrameHandler implements ProtocolHandler
+{
+    private ConnectionEndpoint _connection;
+    private ValueHandler _typeHandler;
+
+    enum State {
+        SIZE_0,
+        SIZE_1,
+        SIZE_2,
+        SIZE_3,
+        PRE_PARSE,
+        BUFFERING,
+        PARSING,
+        ERROR
+    }
+
+    private State _state = State.SIZE_0;
+    private int _size;
+
+    private ByteBuffer _buffer;
+
+
+
+    public FrameHandler(final ConnectionEndpoint connection)
+    {
+        _connection = connection;
+        _typeHandler = new ValueHandler(connection.getDescribedTypeRegistry());
+
+    }
+
+    public ProtocolHandler parse(ByteBuffer in)
+    {
+        try
+        {
+        Error frameParsingError = null;
+        int size = _size;
+        State state = _state;
+        ByteBuffer oldIn = null;
+
+        while(in.hasRemaining() && state != State.ERROR)
+        {
+
+            final int remaining = in.remaining();
+            if(remaining == 0)
+            {
+                return this;
+            }
+
+
+            switch(state)
+            {
+                case SIZE_0:
+                    if(remaining >= 4)
+                    {
+                        size = in.getInt();
+                        state = State.PRE_PARSE;
+                        break;
+                    }
+                    else
+                    {
+                        size = (in.get() << 24) & 0xFF000000;
+                        if(!in.hasRemaining())
+                        {
+                            state = State.SIZE_1;
+                            break;
+                        }
+                    }
+                case SIZE_1:
+                    size |= (in.get() << 16) & 0xFF0000;
+                    if(!in.hasRemaining())
+                    {
+                        state = State.SIZE_2;
+                        break;
+                    }
+                case SIZE_2:
+                    size |= (in.get() << 8) & 0xFF00;
+                    if(!in.hasRemaining())
+                    {
+                        state = State.SIZE_3;
+                        break;
+                    }
+                case SIZE_3:
+                    size |= in.get() & 0xFF;
+                    state = State.PRE_PARSE;
+
+                case PRE_PARSE:
+
+                    if(size < 8)
+                    {
+                        frameParsingError = createFramingError("specified frame size %d smaller than minimum frame header size %d", _size, 8);
+                        state = State.ERROR;
+                        break;
+                    }
+
+                    else if(size > _connection.getMaxFrameSize())
+                    {
+                        frameParsingError = createFramingError("specified frame size %d larger than maximum frame header size %d", size, _connection.getMaxFrameSize());
+                        state = State.ERROR;
+                        break;
+                    }
+
+                    if(in.remaining() < size-4)
+                    {
+                        _buffer = ByteBuffer.allocate(size-4);
+                        _buffer.put(in);
+                        state = State.BUFFERING;
+                        break;
+                    }
+                case BUFFERING:
+                    if(_buffer != null)
+                    {
+                        if(in.remaining() < _buffer.remaining())
+                        {
+                            _buffer.put(in);
+                            break;
+                        }
+                        else
+                        {
+                            ByteBuffer dup = in.duplicate();
+                            dup.limit(dup.position()+_buffer.remaining());
+                            int i = _buffer.remaining();
+                            int d = dup.remaining();
+                            in.position(in.position()+_buffer.remaining());
+                            _buffer.put(dup);
+                            oldIn = in;
+                            _buffer.flip();
+                            in = _buffer;
+                            state = State.PARSING;
+                        }
+                    }
+
+                case PARSING:
+
+                    int dataOffset = (in.get() << 2) & 0x3FF;
+
+                    if(dataOffset < 8)
+                    {
+                        frameParsingError = createFramingError("specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8);
+                        state = State.ERROR;
+                        break;
+                    }
+                    else if(dataOffset > size)
+                    {
+                        frameParsingError = createFramingError("specified frame data offset %d larger than the frame size %d", dataOffset, _size);
+                        state = State.ERROR;
+                        break;
+                    }
+
+                    // type
+
+                    int type = in.get() & 0xFF;
+                    int channel = in.getShort() & 0xFF;
+
+                    if(type != 0 && type != 1)
+                    {
+                        frameParsingError = createFramingError("unknown frame type: %d", type);
+                        state = State.ERROR;
+                        break;
+                    }
+
+                    // channel
+
+                    /*if(channel > _connection.getChannelMax())
+                    {
+                        frameParsingError = createError(AmqpError.DECODE_ERROR,
+                                                        "frame received on invalid channel %d above channel-max %d",
+                                                        channel, _connection.getChannelMax());
+
+                        state = State.ERROR;
+                    }
+*/
+                    // ext header
+                    if(dataOffset!=8)
+                    {
+                        in.position(in.position()+dataOffset-8);
+                    }
+
+                    // oldIn null iff not working on duplicated buffer
+                    if(oldIn == null)
+                    {
+                        oldIn = in;
+                        in = in.duplicate();
+                        final int endPos = in.position() + size - dataOffset;
+                        in.limit(endPos);
+                        oldIn.position(endPos);
+
+                    }
+
+                    int inPos = in.position();
+                    int inLimit = in.limit();
+                    // PARSE HERE
+                    try
+                    {
+                        Object val = _typeHandler.parse(in);
+
+                        if(in.hasRemaining())
+                        {
+                            if(val instanceof Transfer)
+                            {
+                                ByteBuffer buf = ByteBuffer.allocate(in.remaining());
+                                buf.put(in);
+                                buf.flip();
+                                ((Transfer)val).setPayload(buf);
+                            }
+                        }
+
+                        _connection.receive((short)channel,val);
+                        reset();
+                        in = oldIn;
+                        oldIn = null;
+                        _buffer = null;
+                        state = State.SIZE_0;
+                        break;
+
+
+                    }
+                    catch (AmqpErrorException ex)
+                    {
+                        state = State.ERROR;
+                        frameParsingError = ex.getError();
+                    }
+                    catch (RuntimeException e)
+                    {
+                        in.position(inPos);
+                        in.limit(inLimit);
+                        System.err.println(toHex(in));
+                        throw e;
+                    }
+            }
+
+        }
+
+        _state = state;
+        _size = size;
+
+        if(_state == State.ERROR)
+        {
+            _connection.handleError(frameParsingError);
+        }
+        return this;
+        }
+        catch(RuntimeException e)
+        {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    private static String toHex(ByteBuffer in)
+    {
+        Formatter formatter = new Formatter();
+        int count = 0;
+        while(in.hasRemaining())
+        {
+            formatter.format("%02x ", in.get() & 0xFF);
+            if(count++ == 16)
+            {
+                formatter.format("\n");
+                count = 0;
+            }
+
+        }
+        return formatter.toString();
+    }
+
+    private Error createFramingError(String description, Object... args)
+    {
+        return createError(ConnectionError.FRAMING_ERROR, description, args);
+    }
+
+    private Error createError(final ErrorCondition framingError,
+                              final String description,
+                              final Object... args)
+    {
+        Error error = new Error();
+        error.setCondition(framingError);
+        Formatter formatter = new Formatter();
+        error.setDescription(formatter.format(description, args).toString());
+        return error;
+    }
+
+
+    private void reset()
+    {
+        _size = 0;
+        _state = State.SIZE_0;
+    }
+
+
+    public boolean isDone()
+    {
+        return _state == State.ERROR || _connection.closedForInput();
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameParsingError.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameParsingError.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameParsingError.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameParsingError.java Sun Mar 17 18:03:37 2013
@@ -1,34 +1,34 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.framing;
-
-public enum FrameParsingError
-{
-    UNDERSIZED_FRAME_HEADER,
-    OVERSIZED_FRAME_HEADER,
-    DATA_OFFSET_IN_HEADER,
-    DATA_OFFSET_TOO_LARGE,
-    UNKNOWN_FRAME_TYPE,
-    CHANNEL_ID_BEYOND_MAX,
-    SPARE_OCTETS_IN_FRAME_BODY,
-    INSUFFICIENT_OCTETS_IN_FRAME_BODY,
-    UNKNOWN_TYPE_CODE, UNPARSABLE_TYPE;
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.framing;
+
+public enum FrameParsingError
+{
+    UNDERSIZED_FRAME_HEADER,
+    OVERSIZED_FRAME_HEADER,
+    DATA_OFFSET_IN_HEADER,
+    DATA_OFFSET_TOO_LARGE,
+    UNKNOWN_FRAME_TYPE,
+    CHANNEL_ID_BEYOND_MAX,
+    SPARE_OCTETS_IN_FRAME_BODY,
+    INSUFFICIENT_OCTETS_IN_FRAME_BODY,
+    UNKNOWN_TYPE_CODE, UNPARSABLE_TYPE;
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameParsingError.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameTypeHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameTypeHandler.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameTypeHandler.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameTypeHandler.java Sun Mar 17 18:03:37 2013
@@ -1,31 +1,31 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.framing;
-
-import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
-
-public interface FrameTypeHandler extends ProtocolHandler
-{
-    void setExtHeaderRemaining(int size);
-
-    void setBodySize(int size);
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.framing;
+
+import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
+
+public interface FrameTypeHandler extends ProtocolHandler
+{
+    void setExtHeaderRemaining(int size);
+
+    void setBodySize(int size);
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameTypeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/OversizeFrameException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrame.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrame.java?rev=1457505&r1=1457504&r2=1457505&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrame.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrame.java Sun Mar 17 18:03:37 2013
@@ -1,42 +1,42 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.framing;
-
-import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-
-public final class SASLFrame extends AMQFrame<SaslFrameBody>
-{
-
-    public SASLFrame(SaslFrameBody frameBody)
-    {
-        super(frameBody);
-    }
-
-    @Override public short getChannel()
-    {
-        return (short)0;
-    }
-
-    @Override public byte getFrameType()
-    {
-        return (byte)1;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.framing;
+
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+
+public final class SASLFrame extends AMQFrame<SaslFrameBody>
+{
+
+    public SASLFrame(SaslFrameBody frameBody)
+    {
+        super(frameBody);
+    }
+
+    @Override public short getChannel()
+    {
+        return (short)0;
+    }
+
+    @Override public byte getFrameType()
+    {
+        return (byte)1;
+    }
+
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrame.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message