activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r915018 [1/4] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/commands/ main/activemq/io/ main/activemq/wireformat/openwire/marshal/ main/activemq/wireformat/openwire/utils/ main/decaf/internal/io/ main/decaf/io/ mai...
Date Mon, 22 Feb 2010 19:02:36 GMT
Author: tabish
Date: Mon Feb 22 19:02:35 2010
New Revision: 915018

URL: http://svn.apache.org/viewvc?rev=915018&view=rev
Log:
More refactoring in the I/O layer, adds better error checking, reduces cut and paste code.
Add more unit tests for the I/O layer
Attempts to get a little bit more performance out of the I/O code that does bulk moves and copies.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterInputStream.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterOutputStream.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/InputStream.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/OutputStream.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/InputStreamTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/InputStreamTest.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/OutputStreamTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/OutputStreamTest.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedOutputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedOutputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/ByteArrayInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/ByteArrayInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/ByteArrayOutputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/ByteArrayOutputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/DataInput.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/DataInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/DataInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterOutputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/InputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/InputStreamReader.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/InputStreamReader.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/OutputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/OutputStreamWriter.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/OutputStreamWriter.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/Reader.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/Reader.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/Writer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/Writer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/System.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/System.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/SocketInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/SocketInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/SocketOutputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/SocketOutputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-benchmarks/decaf/io/ByteArrayOutputStreamBenchmark.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/BufferedInputStreamTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/BufferedInputStreamTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/BufferedOutputStreamTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/DataInputStreamTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/DataOutputStreamTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/FilterInputStreamTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/FilterInputStreamTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/FilterOutputStreamTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/FilterOutputStreamTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/ReaderTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/WriterTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Feb 22 19:02:35 2010
@@ -517,7 +517,11 @@
     decaf/io/ByteArrayOutputStream.cpp \
     decaf/io/DataInputStream.cpp \
     decaf/io/DataOutputStream.cpp \
+    decaf/io/FilterInputStream.cpp \
+    decaf/io/FilterOutputStream.cpp \
+    decaf/io/InputStream.cpp \
     decaf/io/InputStreamReader.cpp \
+    decaf/io/OutputStream.cpp \
     decaf/io/OutputStreamWriter.cpp \
     decaf/io/Reader.cpp \
     decaf/io/Writer.cpp \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp Mon Feb 22 19:02:35 2010
@@ -322,7 +322,7 @@
         if( length <= (size_t)this->remainingBytes ) {
             // small buffer
             this->remainingBytes -= (int)length;
-            this->dataIn->readFully( buffer, 0, length );
+            this->dataIn->readFully( buffer, length );
             return length;
         } else {
             // big buffer

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp Mon Feb 22 19:02:35 2010
@@ -284,10 +284,14 @@
     stream << ", ";
     stream << "BrokerOutTime = " << this->getBrokerOutTime();
 
-    stream << " Value of ackHandler = " << ackHandler.get() << std::endl;
-    stream << " Value of properties = " << this->properties.toString() << std::endl;
-    stream << " Value of readOnlyBody = " << this->readOnlyBody << std::endl;
-    stream << " Value of readOnlyProperties = " << this->readOnlyBody << std::endl;
+    stream << ", ";
+    stream << " ackHandler = " << ackHandler.get() << std::endl;
+    stream << ", ";
+    stream << " properties = " << this->properties.toString() << std::endl;
+    stream << ", ";
+    stream << " readOnlyBody = " << this->readOnlyBody << std::endl;
+    stream << ", ";
+    stream << " readOnlyProperties = " << this->readOnlyBody << std::endl;
     stream << " }";
 
     return stream.str();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp Mon Feb 22 19:02:35 2010
@@ -36,10 +36,10 @@
 LoggingInputStream::~LoggingInputStream() {}
 
 ////////////////////////////////////////////////////////////////////////////////
-int LoggingInputStream::read() throw ( IOException ) {
+int LoggingInputStream::doReadByte() throw ( IOException ) {
     try {
 
-        unsigned char c = FilterInputStream::read();
+        unsigned char c = FilterInputStream::doReadByte();
         log( &c, 1 );
         return c;
     }
@@ -48,7 +48,8 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int LoggingInputStream::read( unsigned char* buffer, std::size_t size, std::size_t offset, std::size_t length )
+int LoggingInputStream::doReadArrayBounded( unsigned char* buffer, std::size_t size,
+                                            std::size_t offset, std::size_t length )
     throw ( decaf::io::IOException,
             decaf::lang::exceptions::IndexOutOfBoundsException,
             decaf::lang::exceptions::NullPointerException ) {
@@ -71,13 +72,15 @@
                 "Given size{%d} - offset{%d} is less than length{%d}.", size, offset, length );
         }
 
-        std::size_t numRead = FilterInputStream::read( buffer, size, offset, length );
+        std::size_t numRead = FilterInputStream::doReadArrayBounded( buffer, size, offset, length );
 
         log( buffer, numRead );
 
         return (int)numRead;
     }
     AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_RETHROW( IndexOutOfBoundsException )
+    AMQ_CATCH_RETHROW( NullPointerException )
     AMQ_CATCHALL_THROW( IOException )
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h Mon Feb 22 19:02:35 2010
@@ -44,26 +44,12 @@
 
         virtual ~LoggingInputStream();
 
-        /**
-         * Reads a single byte from the buffer.  Blocks until
-         * data is available.
-         * @return The next byte.
-         * @throws IOException thrown if an error occurs.
-         */
-        virtual int read() throw ( decaf::io::IOException );
+    protected:
 
-        /**
-         * Reads an array of bytes from the buffer.  Blocks until
-         * the requested number of bytes are available.
-         * @param buffer (out) the target buffer.
-         * @param offset the position in the buffer to start at
-         * @param bufferSize the size of the output buffer.
-         * @return The number of bytes read or -1 if EOF is detected
-         * @throws IOException thrown if an error occurs.
-         * @throws NullPointerException if buffer is null
-         */
-        virtual int read( unsigned char* buffer, std::size_t size,
-                          std::size_t offset, std::size_t length )
+        virtual int doReadByte() throw ( decaf::io::IOException );
+
+        virtual int doReadArrayBounded( unsigned char* buffer, std::size_t size,
+                                        std::size_t offset, std::size_t length )
             throw ( decaf::io::IOException,
                     decaf::lang::exceptions::IndexOutOfBoundsException,
                     decaf::lang::exceptions::NullPointerException );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp Mon Feb 22 19:02:35 2010
@@ -36,19 +36,19 @@
 LoggingOutputStream::~LoggingOutputStream() {}
 
 ////////////////////////////////////////////////////////////////////////////////
-void LoggingOutputStream::write( const unsigned char c ) throw ( IOException ) {
+void LoggingOutputStream::doWriteByte( const unsigned char c ) throw ( IOException ) {
     try {
 
         log( &c, 1 );
-        FilterOutputStream::write( c );
+        FilterOutputStream::doWriteByte( c );
     }
     AMQ_CATCH_RETHROW( IOException )
     AMQ_CATCHALL_THROW( IOException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void LoggingOutputStream::write( const unsigned char* buffer, std::size_t size,
-                                 std::size_t offset, std::size_t length )
+void LoggingOutputStream::doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+                                               std::size_t offset, std::size_t length )
     throw ( decaf::io::IOException,
             decaf::lang::exceptions::NullPointerException,
             decaf::lang::exceptions::IndexOutOfBoundsException ) {
@@ -73,7 +73,7 @@
 
         log( buffer + offset, length );
 
-        FilterOutputStream::write( buffer, size, offset, length );
+        FilterOutputStream::doWriteArrayBounded( buffer, size, offset, length );
     }
     AMQ_CATCH_RETHROW( IOException )
     AMQ_CATCH_RETHROW( NullPointerException )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h Mon Feb 22 19:02:35 2010
@@ -49,33 +49,12 @@
 
         virtual ~LoggingOutputStream();
 
-        /**
-         * Writes a single byte to the output stream.
-         * @param c the byte.
-         * @throws IOException thrown if an error occurs.
-         */
-        virtual void write( unsigned char c ) throw ( decaf::io::IOException );
+    protected:
 
-        /**
-         * Writes an array of bytes to the output stream in order starting at buffer[offset]
-         * and proceeding until the number of bytes specified by the length argument are
-         * written or an error occurs.
-         *
-         * @param buffer
-         *      The array of bytes to write.
-         * @param size
-         *      The size of the buffer array passed.
-         * @param offset
-         *      The position to start writing in buffer.
-         * @param length
-         *      The number of bytes from the buffer to be written.
-         *
-         * @throws IOException if an I/O error occurs.
-         * @throws NullPointerException thrown if buffer is Null.
-         * @throws IndexOutOfBoundsException if the offset + length > size.
-         */
-        virtual void write( const unsigned char* buffer, std::size_t size,
-                            std::size_t offset, std::size_t length )
+        virtual void doWriteByte( unsigned char c ) throw ( decaf::io::IOException );
+
+        virtual void doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+                                          std::size_t offset, std::size_t length )
             throw ( decaf::io::IOException,
                     decaf::lang::exceptions::NullPointerException,
                     decaf::lang::exceptions::IndexOutOfBoundsException );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp Mon Feb 22 19:02:35 2010
@@ -694,7 +694,7 @@
         if( bs->readBoolean() ) {
             int size = dataIn->readInt();
             data.resize( size );
-            dataIn->readFully( data );
+            dataIn->readFully( &data[0], data.size() );
         }
 
         return data;
@@ -715,7 +715,7 @@
             int size = dataIn->readInt();
             std::vector<unsigned char> data;
             data.resize( size );
-            dataIn->readFully( data );
+            dataIn->readFully( &data[0], data.size() );
             return data;
         }
 
@@ -736,7 +736,7 @@
     try{
         std::vector<unsigned char> data;
         data.resize( size );
-        dataIn->readFully( data );
+        dataIn->readFully( &data[0], data.size() );
         return data;
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -753,7 +753,7 @@
     try{
         std::vector<unsigned char> data;
         data.resize( size );
-        dataIn->readFully( data );
+        dataIn->readFully( &data[0], data.size() );
         return data;
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -815,16 +815,13 @@
 
     try{
 
-        int size = dataIn->readShort() + 1; // add space c++ NULL
-        unsigned char* data = new unsigned char[size];
-        dataIn->readFully( data, 0, size-1 );
-        data[size-1] = 0;  // enforce NULL
+        int size = dataIn->readShort();
+        std::vector<char> data( size );
+        dataIn->readFully( (unsigned char*)&data[0], size );
 
         // Now build a string and copy data into it.
         std::string text;
-        text.resize( size );
-        text.assign( (char*)data, (int)size-1 );
-        delete [] data;
+        text.insert( text.begin(), data.begin(), data.end() );
 
         return text;
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp Mon Feb 22 19:02:35 2010
@@ -336,7 +336,7 @@
                 int size = dataIn.readInt();
                 std::vector<unsigned char> data;
                 data.resize( size );
-                dataIn.readFully( data );
+                dataIn.readFully( &data[0], size );
                 value.setByteArray( data );
                 break;
             }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp Mon Feb 22 19:02:35 2010
@@ -155,7 +155,7 @@
         data.resize( arrayLimit );
 
         // Make sure we get all the data we are expecting
-        dataIn->readFully( &data[0], 0, arrayLimit );
+        dataIn->readFully( &data[0], data.size(), 0, arrayLimit );
 
         clear();
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp Mon Feb 22 19:02:35 2010
@@ -43,7 +43,7 @@
         std::vector<unsigned char> buffer( utfLength );
         std::vector<unsigned char> result( utfLength );
 
-        dataIn.readFully( &buffer[0], 0, utfLength );
+        dataIn.readFully( &buffer[0], utfLength );
 
         int count = 0;
         int index = 0;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp Mon Feb 22 19:02:35 2010
@@ -34,26 +34,15 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void StandardErrorOutputStream::write( unsigned char c )
+void StandardErrorOutputStream::doWriteByte( unsigned char c )
     throw ( decaf::io::IOException ) {
 
     std::cerr << c;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void StandardErrorOutputStream::write( const std::vector<unsigned char>& buffer )
-    throw ( decaf::io::IOException ) {
-
-    if( buffer.empty() ){
-        return;
-    }
-
-    this->write( &buffer[0], buffer.size(), 0, buffer.size() );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StandardErrorOutputStream::write( const unsigned char* buffer, std::size_t size,
-                                       std::size_t offset, std::size_t length )
+void StandardErrorOutputStream::doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+                                                     std::size_t offset, std::size_t length )
     throw ( decaf::io::IOException,
             decaf::lang::exceptions::NullPointerException,
             decaf::lang::exceptions::IndexOutOfBoundsException ) {
@@ -83,3 +72,8 @@
 void StandardErrorOutputStream::flush() throw ( decaf::io::IOException ){
     std::cerr.flush();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void StandardErrorOutputStream::close() throw ( decaf::io::IOException ){
+    std::cerr.flush();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h Mon Feb 22 19:02:35 2010
@@ -19,7 +19,6 @@
 
 #include <decaf/util/Config.h>
 #include <decaf/io/OutputStream.h>
-#include <decaf/util/concurrent/Mutex.h>
 
 namespace decaf{
 namespace internal{
@@ -31,63 +30,13 @@
      * platforms or compilers that do not support <code>std::cerr</code>.
      */
     class DECAF_API StandardErrorOutputStream : public decaf::io::OutputStream {
-    private:
-
-        /**
-         * Synchronization object.
-         */
-        util::concurrent::Mutex mutex;
-
     public:
 
-        /**
-         * Default Constructor
-         */
         StandardErrorOutputStream();
 
         virtual ~StandardErrorOutputStream();
 
         /**
-         * Writes a single byte to the output stream.
-         * @param c the byte.
-         * @throws IOException thrown if an error occurs.
-         */
-        virtual void write( unsigned char c )
-            throw ( decaf::io::IOException );
-
-        /**
-         * Writes an array of bytes to the output stream.
-         * @param buffer The bytes to write.
-         * @throws IOException thrown if an error occurs.
-         */
-        virtual void write( const std::vector<unsigned char>& buffer )
-            throw ( decaf::io::IOException );
-
-        /**
-         * Writes an array of bytes to the output stream in order starting at buffer[offset]
-         * and proceeding until the number of bytes specified by the length argument are
-         * written or an error occurs.
-         *
-         * @param buffer
-         *      The array of bytes to write.
-         * @param size
-         *      The size of the buffer array passed.
-         * @param offset
-         *      The position to start writing in buffer.
-         * @param length
-         *      The number of bytes from the buffer to be written.
-         *
-         * @throws IOException if an I/O error occurs.
-         * @throws NullPointerException thrown if buffer is Null.
-         * @throws IndexOutOfBoundsException if the offset + length > size.
-         */
-        virtual void write( const unsigned char* buffer, std::size_t size,
-                            std::size_t offset, std::size_t length )
-            throw ( decaf::io::IOException,
-                    decaf::lang::exceptions::NullPointerException,
-                    decaf::lang::exceptions::IndexOutOfBoundsException );
-
-        /**
          * Invokes flush on the target output stream.
          * throws decaf::io::IOException if an error occurs
          */
@@ -97,59 +46,17 @@
          * Invokes close on the target output stream.
          * throws IOException if an error occurs
          */
-        virtual void close() throw( decaf::io::IOException ){
-            this->flush();
-        }
-
-    public:
-
-        virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
-            mutex.lock();
-        }
-
-        virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
-            return mutex.tryLock();
-        }
-
-        virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
-            mutex.unlock();
-        }
-
-        virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
-                                   decaf::lang::exceptions::IllegalMonitorStateException,
-                                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait();
-        }
+        virtual void close() throw( decaf::io::IOException );
 
-        virtual void wait( long long millisecs )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
+    protected:
 
-            mutex.wait( millisecs );
-        }
+        virtual void doWriteByte( unsigned char value ) throw ( decaf::io::IOException );
 
-        virtual void wait( long long millisecs, int nanos )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalArgumentException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait( millisecs, nanos );
-        }
-
-        virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
-                                     decaf::lang::exceptions::IllegalMonitorStateException ) {
-
-            mutex.notify();
-        }
-
-        virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
-                                        decaf::lang::exceptions::IllegalMonitorStateException ) {
-
-            mutex.notifyAll();
-        }
+        virtual void doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+                                          std::size_t offset, std::size_t length )
+            throw ( decaf::io::IOException,
+                    decaf::lang::exceptions::NullPointerException,
+                    decaf::lang::exceptions::IndexOutOfBoundsException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp Mon Feb 22 19:02:35 2010
@@ -40,11 +40,11 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 std::size_t StandardInputStream::available() const throw ( decaf::io::IOException ) {
-    return 0;
+    return 1;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int StandardInputStream::read() throw ( decaf::io::IOException ) {
+int StandardInputStream::doReadByte() throw ( decaf::io::IOException ) {
 
     if( !std::cin.good() ) {
         throw decaf::io::IOException(
@@ -54,34 +54,3 @@
 
     return std::cin.get();
 }
-
-////////////////////////////////////////////////////////////////////////////////
-int StandardInputStream::read( unsigned char* buffer,
-                               std::size_t offset DECAF_UNUSED,
-                               std::size_t bufferSize DECAF_UNUSED )
-    throw ( decaf::io::IOException, decaf::lang::exceptions::NullPointerException ) {
-
-    if( buffer == NULL ) {
-        throw NullPointerException(
-            __FILE__, __LINE__,
-            "ByteArrayInputStream::read - Buffer passed is Null" );
-    }
-
-    return 0;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-std::size_t StandardInputStream::skip( std::size_t num DECAF_UNUSED )
-    throw ( decaf::io::IOException,
-            decaf::lang::exceptions::UnsupportedOperationException ) {
-
-    return 0;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StandardInputStream::reset() throw ( decaf::io::IOException ) {
-
-    throw decaf::io::IOException(
-        __FILE__, __LINE__,
-        "Mark and Rest not Supported by this class." );
-}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h Mon Feb 22 19:02:35 2010
@@ -20,23 +20,16 @@
 
 #include <decaf/util/Config.h>
 #include <decaf/io/InputStream.h>
-#include <decaf/util/concurrent/Mutex.h>
 
 namespace decaf {
 namespace internal {
 namespace io {
 
     class DECAF_API StandardInputStream : public decaf::io::InputStream {
-    private:
-
-        /**
-         * Synchronization object.
-         */
-        util::concurrent::Mutex mutex;
-
     public:
 
         StandardInputStream();
+
         virtual ~StandardInputStream();
 
         /**
@@ -45,153 +38,9 @@
          */
         virtual std::size_t available() const throw ( decaf::io::IOException );
 
-        /**
-         * Reads a single byte from the buffer.
-         * @return The next byte.
-         * @throws IOException thrown if an error occurs.
-         */
-        virtual int read() throw ( decaf::io::IOException );
-
-        /**
-         * Reads an array of bytes from the buffer.
-         *
-         * @param buffer (out) the target buffer.
-         * @param offset the position in the buffer to start reading from.
-         * @param bufferSize the size of the output buffer.
-         *
-         * @return The number of bytes read.
-         *
-         * @throws IOException thrown if an error occurs.
-         * @throws NullPointerException if buffer is null.
-         */
-        virtual int read( unsigned char* buffer,
-                          std::size_t offset,
-                          std::size_t bufferSize )
-            throw ( decaf::io::IOException, decaf::lang::exceptions::NullPointerException );
-
-        /**
-         * Closes the target input stream.
-         * @throws IOException thrown if an error occurs.
-         */
-        virtual void close() throw( decaf::io::IOException ) {}
-
-        /**
-         * Skips over and discards n bytes of data from this input stream. The
-         * skip method may, for a variety of reasons, end up skipping over some
-         * smaller number of bytes, possibly 0. This may result from any of a
-         * number of conditions; reaching end of file before n bytes have been
-         * skipped is only one possibility. The actual number of bytes skipped
-         * is returned. If n is negative, no bytes are skipped.
-         * <p>
-         * The skip method of InputStream creates a byte array and then
-         * repeatedly reads into it until n bytes have been read or the end
-         * of the stream has been reached. Subclasses are encouraged to
-         * provide a more efficient implementation of this method.
-         * @param num - the number of bytes to skip
-         * @returns total bytes skipped
-         *
-         * @throws IOException if an error occurs
-         * @throws UnsupportedOperationException
-         *         If skip is not supported.
-         */
-        virtual std::size_t skip( std::size_t num )
-            throw ( decaf::io::IOException,
-                    decaf::lang::exceptions::UnsupportedOperationException );
-
-        /**
-         * Marks the current position in the stream A subsequent call to the
-         * reset method repositions this stream at the last marked position so
-         * that subsequent reads re-read the same bytes.
-         *
-         * If a stream instance reports that marks are supported then the stream
-         * will ensure that the same bytes can be read again after the reset method
-         * is called so long the readLimit is not reached.
-         * @param readLimit - max bytes read before marked position is invalid.
-         */
-        virtual void mark( int readLimit DECAF_UNUSED ) {}
-
-        /**
-         * Repositions this stream to the position at the time the mark method was
-         * last called on this input stream.
-         *
-         * If the method markSupported returns true, then:
-         *   * If the method mark has not been called since the stream was created,
-         *     or the number of bytes read from the stream since mark was last called
-         *     is larger than the argument to mark at that last call, then an
-         *     IOException might be thrown.
-         *   * If such an IOException is not thrown, then the stream is reset to a
-         *     state such that all the bytes read since the most recent call to mark
-         *     (or since the start of the file, if mark has not been called) will be
-         *     resupplied to subsequent callers of the read method, followed by any
-         *     bytes that otherwise would have been the next input data as of the
-         *     time of the call to reset.
-         * If the method markSupported returns false, then:
-         *   * The call to reset may throw an IOException.
-         *   * If an IOException is not thrown, then the stream is reset to a fixed
-         *     state that depends on the particular type of the input stream and how
-         *     it was created. The bytes that will be supplied to subsequent callers
-         *     of the read method depend on the particular type of the input stream.
-         * @throws IOException
-         */
-        virtual void reset() throw ( decaf::io::IOException );
-
-        /**
-         * Determines if this input stream supports the mark and reset methods.
-         * Whether or not mark and reset are supported is an invariant property of
-         * a particular input stream instance.
-         * @returns true if this stream instance supports marks
-         */
-        virtual bool markSupported() const{ return false; }
-
     protected:
 
-        virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
-            mutex.lock();
-        }
-
-        virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
-            return mutex.tryLock();
-        }
-
-        virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
-            mutex.unlock();
-        }
-
-        virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
-                                   decaf::lang::exceptions::IllegalMonitorStateException,
-                                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait();
-        }
-
-        virtual void wait( long long millisecs )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait( millisecs );
-        }
-
-        virtual void wait( long long millisecs, int nanos )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalArgumentException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait( millisecs, nanos );
-        }
-
-        virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
-                                     decaf::lang::exceptions::IllegalMonitorStateException ) {
-
-            mutex.notify();
-        }
-
-        virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
-                                        decaf::lang::exceptions::IllegalMonitorStateException ) {
-
-            mutex.notifyAll();
-        }
+        virtual int doReadByte() throw ( decaf::io::IOException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp Mon Feb 22 19:02:35 2010
@@ -34,42 +34,36 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void StandardOutputStream::write( unsigned char c )
+void StandardOutputStream::doWriteByte( unsigned char c )
     throw ( decaf::io::IOException ) {
 
     std::cout << c;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void StandardOutputStream::write( const std::vector<unsigned char>& buffer )
-    throw ( decaf::io::IOException ) {
+void StandardOutputStream::doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+                                                     std::size_t offset, std::size_t length )
+    throw ( decaf::io::IOException,
+            decaf::lang::exceptions::NullPointerException,
+            decaf::lang::exceptions::IndexOutOfBoundsException ) {
 
-    if( buffer.empty() ){
+    if( length == 0 ) {
         return;
     }
 
-    this->write( &buffer[0], 0, buffer.size() );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StandardOutputStream::write( const unsigned char* buffer,
-                                       std::size_t offset,
-                                       std::size_t len )
-    throw ( decaf::io::IOException, lang::exceptions::NullPointerException ) {
-
     if( buffer == NULL ) {
         throw lang::exceptions::NullPointerException(
             __FILE__, __LINE__,
-            "StandardOutputStream::write - Passed buffer is null." );
+            "StandardErrorOutputStream::write - Passed buffer is null." );
     }
 
-    if( offset > len ) {
-        throw decaf::io::IOException(
+    if( ( offset + length ) > size ) {
+        throw decaf::lang::exceptions::IndexOutOfBoundsException(
             __FILE__, __LINE__,
-            "StandardOutputStream::write - offset passed is greater than length" );
+            "StandardErrorOutputStream::write - given offset + length is greater than buffer size.");
     }
 
-	for( std::size_t i = 0; i < len; ++i ) {
+    for( std::size_t i = 0; i < length; ++i ) {
         std::cout << buffer[i+offset];
     }
 }
@@ -78,3 +72,8 @@
 void StandardOutputStream::flush() throw ( decaf::io::IOException ){
     std::cout.flush();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void StandardOutputStream::close() throw ( decaf::io::IOException ){
+    std::cout.flush();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h Mon Feb 22 19:02:35 2010
@@ -20,53 +20,17 @@
 
 #include <decaf/util/Config.h>
 #include <decaf/io/OutputStream.h>
-#include <decaf/util/concurrent/Mutex.h>
 
 namespace decaf {
 namespace internal {
 namespace io {
 
     class DECAF_API StandardOutputStream : public decaf::io::OutputStream {
-    private:
-
-        /**
-         * Synchronization object.
-         */
-        util::concurrent::Mutex mutex;
-
     public:
 
         StandardOutputStream();
-        virtual ~StandardOutputStream();
 
-        /**
-         * Writes a single byte to the output stream.
-         * @param c the byte.
-         * @throws IOException thrown if an error occurs.
-         */
-        virtual void write( unsigned char c )
-            throw ( decaf::io::IOException );
-
-        /**
-         * Writes an array of bytes to the output stream.
-         * @param buffer The bytes to write.
-         * @throws IOException thrown if an error occurs.
-         */
-        virtual void write( const std::vector<unsigned char>& buffer )
-            throw ( decaf::io::IOException );
-
-        /**
-         * Writes an array of bytes to the output stream.
-         * @param buffer The array of bytes to write.
-         * @param offset, the position to start writing in buffer.
-         * @param len The number of bytes from the buffer to be written.
-         * @throws IOException thrown if an error occurs.
-         * @throws NullPointerException if buffer is null.
-         */
-        virtual void write( const unsigned char* buffer,
-                            std::size_t offset,
-                            std::size_t len )
-            throw ( decaf::io::IOException, lang::exceptions::NullPointerException );
+        virtual ~StandardOutputStream();
 
         /**
          * Invokes flush on the target output stream.
@@ -78,59 +42,17 @@
          * Invokes close on the target output stream.
          * throws IOException if an error occurs
          */
-        virtual void close() throw( decaf::io::IOException ){
-            this->flush();
-        }
-
-    public:
-
-        virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
-            mutex.lock();
-        }
-
-        virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
-            return mutex.tryLock();
-        }
-
-        virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
-            mutex.unlock();
-        }
-
-        virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
-                                   decaf::lang::exceptions::IllegalMonitorStateException,
-                                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait();
-        }
-
-        virtual void wait( long long millisecs )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait( millisecs );
-        }
-
-        virtual void wait( long long millisecs, int nanos )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalArgumentException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait( millisecs, nanos );
-        }
-
-        virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
-                                     decaf::lang::exceptions::IllegalMonitorStateException ) {
+        virtual void close() throw( decaf::io::IOException );
 
-            mutex.notify();
-        }
+    protected:
 
-        virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
-                                        decaf::lang::exceptions::IllegalMonitorStateException ) {
+        virtual void doWriteByte( unsigned char value ) throw ( decaf::io::IOException );
 
-            mutex.notifyAll();
-        }
+        virtual void doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+                                          std::size_t offset, std::size_t length )
+            throw ( decaf::io::IOException,
+                    decaf::lang::exceptions::NullPointerException,
+                    decaf::lang::exceptions::IndexOutOfBoundsException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp Mon Feb 22 19:02:35 2010
@@ -64,6 +64,11 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+std::size_t BlockingByteArrayInputStream::available() const throw ( decaf::io::IOException ){
+    return std::distance( pos, buffer.end() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void BlockingByteArrayInputStream::close() throw ( io::IOException ){
 
     synchronized( this ){
@@ -80,9 +85,10 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int BlockingByteArrayInputStream::read() throw ( IOException ){
+int BlockingByteArrayInputStream::doReadByte() throw ( IOException ){
 
     try{
+
         synchronized( this ){
 
             while( !closing ){
@@ -105,13 +111,13 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int BlockingByteArrayInputStream::read( unsigned char* buffer, std::size_t size,
-                                        std::size_t offset, std::size_t length )
+int BlockingByteArrayInputStream::doReadArrayBounded( unsigned char* buffer, std::size_t size,
+                                                      std::size_t offset, std::size_t length )
     throw ( decaf::io::IOException,
             decaf::lang::exceptions::IndexOutOfBoundsException,
             decaf::lang::exceptions::NullPointerException ) {
 
-    if( size == 0 || length == 0 ) {
+    if( length == 0 ) {
         return 0;
     }
 
@@ -127,33 +133,40 @@
             "Given size{%d} - offset{%d} is less than length{%d}.", size, offset, length );
     }
 
-    synchronized( this ){
+    try {
+
+        synchronized( this ){
 
-        std::size_t ix = 0;
+            std::size_t ix = 0;
 
-        for( ; ix < length && !closing; ++ix ) {
+            for( ; ix < length && !closing; ++ix ) {
 
-            if( pos == this->buffer.end() ) {
-                // Wait for more data to come in.
-                wait();
+                if( pos == this->buffer.end() ) {
+                    // Wait for more data to come in.
+                    wait();
+                }
+
+                if( !closing && pos != this->buffer.end() ){
+                    buffer[ix + offset] = *(pos);
+                    ++pos;
+                }
             }
 
-            if( !closing && pos != this->buffer.end() ){
-                buffer[ix + offset] = *(pos);
-                ++pos;
+            if( closing ){
+                throw IOException(
+                    __FILE__, __LINE__,
+                    "BlockingByteArrayInputStream::read - close occurred during read" );
             }
-        }
 
-        if( closing ){
-            throw IOException(
-                __FILE__, __LINE__,
-                "BlockingByteArrayInputStream::read - close occurred during read" );
+            return (int)ix;
         }
 
-        return (int)ix;
+        return 0;
     }
-
-    return 0;
+    DECAF_CATCH_RETHROW( IOException )
+    DECAF_CATCH_RETHROW( NullPointerException )
+    DECAF_CATCH_RETHROW( IndexOutOfBoundsException )
+    DECAF_CATCHALL_THROW( IOException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h Mon Feb 22 19:02:35 2010
@@ -19,7 +19,6 @@
 #define _DECAF_IO_BLOCKINGBYTEARRAYINPUTSTREAM_H_
 
 #include <decaf/io/InputStream.h>
-#include <decaf/util/concurrent/Mutex.h>
 #include <vector>
 
 namespace decaf{
@@ -44,11 +43,6 @@
         std::vector<unsigned char>::const_iterator pos;
 
         /**
-         * Synchronization object.
-         */
-        util::concurrent::Mutex mutex;
-
-        /**
          * Indicates that this stream is in the process of shutting
          * down.
          */
@@ -68,9 +62,6 @@
         BlockingByteArrayInputStream( const unsigned char* buffer,
                                       std::size_t bufferSize );
 
-        /**
-         * Destructor
-         */
         virtual ~BlockingByteArrayInputStream();
 
         /**
@@ -90,41 +81,13 @@
          * @return the data available in the internal buffer.
          * @throws IOException if an error occurs.
          */
-        virtual std::size_t available() const throw ( IOException ){
-            return std::distance( pos, buffer.end() );
-        }
-
-        /**
-         * Reads a single byte from the buffer.  This operation will
-         * block until data has been added to the buffer via a call
-         * to setByteArray.
-         * @return the next byte.
-         * @throws IOException if an error occurs.
-         */
-        virtual int read() throw ( IOException );
-
-        /**
-         * Reads an array of bytes from the buffer.  If the desired amount
-         * of data is not currently available, this operation
-         * will block until the appropriate amount of data is available
-         * in the buffer via a call to setByteArray.
-         * @param buffer (out) the target buffer
-         * @param offset the position in the buffer to start from.
-         * @param bufferSize the size of the output buffer.
-         * @return the number of bytes read. or -1 if EOF
-         * @throws IOException f an error occurs.
-         */
-        virtual int read( unsigned char* buffer, std::size_t size,
-                          std::size_t offset, std::size_t length )
-            throw ( decaf::io::IOException,
-                    decaf::lang::exceptions::IndexOutOfBoundsException,
-                    decaf::lang::exceptions::NullPointerException );
+        virtual std::size_t available() const throw ( decaf::io::IOException );
 
         /**
          * Closes the target input stream.
          * @throws IOException if an error occurs.
          */
-        virtual void close() throw ( io::IOException );
+        virtual void close() throw ( decaf::io::IOException );
 
         /**
          * Skips over and discards n bytes of data from this input stream. The
@@ -143,106 +106,18 @@
          * @throws IOException if an error occurs
          */
         virtual std::size_t skip( std::size_t num )
-            throw ( io::IOException, lang::exceptions::UnsupportedOperationException );
-
-        /**
-         * Marks the current position in the stream A subsequent call to the
-         * reset method repositions this stream at the last marked position so
-         * that subsequent reads re-read the same bytes.
-         *
-         * If a stream instance reports that marks are supported then the stream
-         * will ensure that the same bytes can be read again after the reset method
-         * is called so long the readLimit is not reached.
-         * @param readLimit - max bytes read before marked position is invalid.
-         */
-        virtual void mark( int readLimit DECAF_UNUSED ) {}
-
-        /**
-         * Repositions this stream to the position at the time the mark method was
-         * last called on this input stream.
-         *
-         * If the method markSupported returns true, then:
-         *   * If the method mark has not been called since the stream was created,
-         *     or the number of bytes read from the stream since mark was last called
-         * 	   is larger than the argument to mark at that last call, then an
-         *     IOException might be thrown.
-         *   * If such an IOException is not thrown, then the stream is reset to a
-         *     state such that all the bytes read since the most recent call to mark
-         *     (or since the start of the file, if mark has not been called) will be
-         *     resupplied to subsequent callers of the read method, followed by any
-         *     bytes that otherwise would have been the next input data as of the
-         *     time of the call to reset.
-         * If the method markSupported returns false, then:
-         *   * The call to reset may throw an IOException.
-         *   * If an IOException is not thrown, then the stream is reset to a fixed
-         *     state that depends on the particular type of the input stream and how
-         *     it was created. The bytes that will be supplied to subsequent callers
-         *     of the read method depend on the particular type of the input stream.
-         * @throws IOException
-         */
-        virtual void reset() throw ( IOException ) {
-            throw IOException(
-                __FILE__, __LINE__,
-                "BufferedInputStream::reset - mark no yet supported." );
-        }
-
-        /**
-         * Determines if this input stream supports the mark and reset methods.
-         * Whether or not mark and reset are supported is an invariant property of
-         * a particular input stream instance.
-         * @returns true if this stream instance supports marks
-         */
-        virtual bool markSupported() const{ return false; }
-
-    public:
-
-        virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
-            mutex.lock();
-        }
-
-        virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
-            return mutex.tryLock();
-        }
-
-        virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
-            mutex.unlock();
-        }
-
-        virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
-                                   decaf::lang::exceptions::IllegalMonitorStateException,
-                                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait();
-        }
-
-        virtual void wait( long long millisecs )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait( millisecs );
-        }
-
-        virtual void wait( long long millisecs, int nanos )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalArgumentException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
-
-            mutex.wait( millisecs, nanos );
-        }
-
-        virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
-                                     decaf::lang::exceptions::IllegalMonitorStateException ) {
+            throw ( decaf::io::IOException,
+                    decaf::lang::exceptions::UnsupportedOperationException );
 
-            mutex.notify();
-        }
+    protected:
 
-        virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
-                                        decaf::lang::exceptions::IllegalMonitorStateException ) {
+        virtual int doReadByte() throw ( IOException );
 
-            mutex.notifyAll();
-        }
+        virtual int doReadArrayBounded( unsigned char* buffer, std::size_t size,
+                                        std::size_t offset, std::size_t length )
+            throw ( decaf::io::IOException,
+                    decaf::lang::exceptions::IndexOutOfBoundsException,
+                    decaf::lang::exceptions::NullPointerException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp Mon Feb 22 19:02:35 2010
@@ -16,14 +16,10 @@
  */
 
 #include "BufferedInputStream.h"
-#include <algorithm>
 
-#ifdef HAVE_STRING_H
-#include <string.h>
-#endif
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
+#include <decaf/lang/System.h>
+
+#include <algorithm>
 
 using namespace std;
 using namespace decaf;
@@ -32,257 +28,466 @@
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace decaf{
+namespace io{
+
+    class StreamBuffer {
+    private:
+
+        unsigned char* buffer;
+        std::size_t bufferSize;
+        std::size_t pos;
+        std::size_t count;
+        std::size_t markLimit;
+        long long markPos;
+
+    public:
+
+        StreamBuffer( std::size_t bufferSize ) {
+
+            this->buffer = new unsigned char[bufferSize];
+            this->bufferSize = bufferSize;
+            this->pos = 0;
+            this->count = 0;
+            this->markLimit = 0;
+            this->markPos = -1;
+        }
+
+        ~StreamBuffer() {
+            delete [] this->buffer;
+        }
+
+        void resize( std::size_t newSize ) {
+            unsigned char* temp = new unsigned char[newSize];
+            System::arraycopy( temp, 0, buffer, 0, count );
+            std::swap( temp, buffer );
+            delete [] temp;
+            this->bufferSize = newSize;
+        }
+
+        std::size_t getUnusedBytes() const{
+            return bufferSize - count;
+        }
+
+        std::size_t available() const {
+            return this->count - this->pos;
+        }
+
+        unsigned char next() {
+            return this->buffer[this->pos++];
+        }
+
+        void advance( std::size_t amount ) {
+            this->pos += amount;
+        }
+
+        void reverse( std::size_t amount ) {
+            this->pos -= amount;
+        }
+
+        void advanceTail( std::size_t amount ) {
+            this->count += amount;
+        }
+
+        std::size_t getBufferSize() {
+            return this->bufferSize;
+        }
+
+        unsigned char* getBuffer() {
+            return this->buffer;
+        }
+
+        std::size_t getCount() const{
+            return count;
+        }
+
+        void setCount( std::size_t count ) {
+            this->count = count;
+        }
+
+        std::size_t getPos() const{
+            return pos;
+        }
+
+        void setPos( std::size_t pos ) {
+            this->pos = pos;
+        }
+
+        void clear(){
+            pos = count = 0;
+        }
+
+        bool isEmpty() const{
+            return pos == count;
+        }
+
+        bool isMarked() const{
+            return this->markPos != -1;
+        }
+
+        void reset() {
+            this->pos = this->markPos;
+        }
+
+        void normalizeBuffer() {
+            if( isEmpty() ){
+                clear();
+            }
+        }
+
+        void mark( int readLimit ) {
+            this->markLimit = readLimit;
+            this->markPos = this->pos;
+        }
+
+        void unmark() {
+            this->markLimit = 0;
+            this->markPos = -1;
+        }
+
+        bool isReadLimitExceeded() const {
+            return pos - markPos >= markLimit;
+        }
+
+        std::size_t getMarkPos() const {
+            return (std::size_t)this->markPos;
+        }
+
+        void setMarkPos( std::size_t markPos ) {
+            this->markPos = markPos;
+        }
+
+        std::size_t getMarkLimit() const {
+            return this->markLimit;
+        }
+    };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
 BufferedInputStream::BufferedInputStream( InputStream* stream, bool own )
 : FilterInputStream( stream, own ) {
-    // Default to a 1k buffer.
-    init( 1024 );
+
+    // Default to a 8k buffer.
+    this->buffer = new StreamBuffer( 8192 );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-BufferedInputStream::BufferedInputStream( InputStream* stream,
-                                          std::size_t bufferSize,
-                                          bool own )
-    throw ( lang::exceptions::IllegalArgumentException )
-
-: FilterInputStream( stream, own ) {
+BufferedInputStream::BufferedInputStream( InputStream* stream, std::size_t bufferSize, bool own )
+    throw ( lang::exceptions::IllegalArgumentException ) : FilterInputStream( stream, own ) {
 
-    try {
-        this->init( bufferSize );
+    if( bufferSize == 0 ) {
+        throw new IllegalArgumentException(
+            __FILE__, __LINE__,
+            "BufferedInputStream::init - Size must be greater than zero");
     }
-    DECAF_CATCH_RETHROW( IllegalArgumentException )
-    DECAF_CATCHALL_THROW( IllegalArgumentException )
+
+    this->buffer = new StreamBuffer( bufferSize );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 BufferedInputStream::~BufferedInputStream() {
     try{
         this->close();
-
-        // Destroy the buffer.
-        if( buffer != NULL ){
-            delete [] buffer;
-            buffer = NULL;
-        }
+        delete this->buffer;
     }
     DECAF_CATCH_NOTHROW( IOException )
     DECAF_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void BufferedInputStream::init( std::size_t bufferSize ){
+void BufferedInputStream::close() throw( IOException ) {
 
-    if( bufferSize <= 0 ) {
-        throw new IllegalArgumentException(
+    // let parent close the inputStream
+    FilterInputStream::close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::size_t BufferedInputStream::available() const throw ( IOException ) {
+
+    if( buffer == NULL || this->isClosed() ) {
+        throw IOException(
             __FILE__, __LINE__,
-            "BufferedInputStream::init - Size must be greater than zero");
+            "BufferedInputStream::available - Buffer was closed");
     }
 
-    this->bufferSize = bufferSize;
+    return buffer->available() + inputStream->available();
+}
 
-    // Create the buffer and initialize the head and tail positions.
-    this->buffer = new unsigned char[bufferSize];
-    this->head = 0;
-    this->tail = 0;
-    this->markLimit = 0;
-    this->markpos = -1;
+////////////////////////////////////////////////////////////////////////////////
+void BufferedInputStream::mark( int readLimit ) {
+    if( this->buffer != NULL ) {
+        this->buffer->mark( readLimit );
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void BufferedInputStream::close() throw( IOException ) {
+void BufferedInputStream::reset() throw ( IOException ) {
 
-    // let parent close the inputStream
-    FilterInputStream::close();
+    if( this->isClosed() ) {
+        throw IOException(
+            __FILE__, __LINE__,
+            "BufferedInputStream::reset - This stream has been closed." );
+    }
+
+    if( !this->buffer->isMarked() ) {
+        throw IOException(
+            __FILE__, __LINE__,
+            "BufferedInputStream::reset - The mark position was invalidated." );
+    }
+
+    this->buffer->reset();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::read() throw ( IOException ){
+int BufferedInputStream::doReadByte() throw ( IOException ){
 
     try{
 
-        if( isClosed() ){
+        // Use a local reference in case of unsynchronized close.
+        InputStream* inputStream = this->inputStream;
+
+        if( isClosed() || inputStream == NULL ){
             throw IOException(
                 __FILE__, __LINE__,
-                "BufferedInputStream::bufferData - Stream is clsoed" );
+                "BufferedInputStream::bufferData - Stream is closed" );
         }
 
-        // If there's no data left, reset to pointers to the beginning of the
-        // buffer.
-        normalizeBuffer();
-
-        // If we don't have any data buffered yet - read as much as
-        // we can.
-        if( isEmpty() ){
+        // Are there buffered bytes available?  Or can we read more?
+        if( this->buffer->isEmpty() && bufferData( inputStream ) == -1 ) {
+            return -1;
+        }
 
-            // If we hit EOF without getting any Data, then throw IOException
-            if( bufferData() == -1 ) {
-                return -1;
-            }
+        // Stream might have closed while we were buffering.
+        if( isClosed() ){
+            throw IOException(
+                __FILE__, __LINE__,
+                "BufferedInputStream::bufferData - Stream is closed" );
         }
 
-        // Get the next character.
-        char returnValue = buffer[head++];
+        if( !this->buffer->isEmpty() ) {
+            return this->buffer->next();
+        }
 
-        return returnValue;
+        return -1;
     }
     DECAF_CATCH_RETHROW( IOException )
     DECAF_CATCHALL_THROW( IOException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::read( unsigned char* buffer, std::size_t size, std::size_t offset, std::size_t length )
+int BufferedInputStream::doReadArrayBounded( unsigned char* buffer, std::size_t size,
+                                             std::size_t offset, std::size_t length )
     throw ( decaf::io::IOException,
             decaf::lang::exceptions::IndexOutOfBoundsException,
             decaf::lang::exceptions::NullPointerException ) {
 
     try{
 
+        // Use a local reference in case of unsynchronized close.
+        InputStream* inputStream = this->inputStream;
+
         if( isClosed() ){
             throw IOException(
-                __FILE__, __LINE__,
-                "BufferedInputStream::bufferData - Stream is clsoed" );
+                __FILE__, __LINE__, "Stream is closed" );
         }
 
-        // For zero, do nothing
-        if( size == 0 || length == 0 ) {
-            return 0;
+        if( buffer == NULL ) {
+            throw NullPointerException(
+                __FILE__, __LINE__,
+                "Buffer passed was NULL." );
         }
 
-        if( length > size - offset ) {
+        if( offset + length > size ) {
             throw IndexOutOfBoundsException(
                 __FILE__, __LINE__,
                 "Given size{%d} - offset{%d} is less than length{%d}.", size, offset, length );
         }
 
-        if( size - offset < length ) {
-            throw IndexOutOfBoundsException(
-                __FILE__, __LINE__, "Given size{%d} - offset{%d} > length{%d}.", size, offset, length );
+        // For zero, do nothing
+        if( length == 0 ) {
+            return 0;
+        }
+
+        if( inputStream == NULL ){
+            throw IOException(
+                __FILE__, __LINE__, "Stream is closed" );
         }
 
-        // If there's no data left, reset to pointers to the beginning of the
-        // buffer.
-        normalizeBuffer();
+        std::size_t required = 0;
+
+        // There are bytes available in the buffer so use them up first and
+        // then we check to see if any are available on the stream, if not
+        // then just return what we had.
+        if( !this->buffer->isEmpty() ) {
+
+            std::size_t copylength =
+                this->buffer->available() >= length ? length : this->buffer->available();
+
+            System::arraycopy( this->buffer->getBuffer(), this->buffer->getPos(),
+                               buffer, offset, copylength );
+            this->buffer->advance( copylength );
+
+            if( copylength == length || inputStream->available() == 0 ) {
+                return copylength;
+            }
 
-        // If we still haven't filled the output buffer AND there is data
-        // on the input stream to be read, read a buffer's worth from the stream.
-        std::size_t totalRead = 0;
-        while( totalRead < length ){
+            offset += copylength;
+            required = length - copylength;
+        } else {
+            required = length;
+        }
 
-            // Get the remaining bytes to copy.
-            std::size_t bytesToCopy = min( tail-head, (length-totalRead) );
+        while( true ) {
 
-            // Copy the data to the output buffer.
-            memcpy( buffer+totalRead+offset, this->buffer+head, bytesToCopy );
+            int read = 0;
 
-            // Increment the total bytes read.
-            totalRead += bytesToCopy;
+            // If we're not marked and the required size is greater than the
+            // buffer, simply read the bytes directly bypassing the buffer.
+            if( !this->buffer->isMarked() && required >= this->buffer->getBufferSize() ) {
 
-            // Increment the head position.
-            head += bytesToCopy;
+                read = inputStream->read( buffer, size, offset, required );
+                if( read == -1 ) {
+                    return required == length ? -1 : length - required;
+                }
 
-            // If the buffer is now empty, reset the positions to the
-            // head of the buffer.
-            normalizeBuffer();
+            } else {
 
-            // If we still haven't satisified the request,
-            // read more data.
-            if( totalRead < length ){
+                if( bufferData( inputStream ) == -1 ) {
+                    return required == length ? -1 : length - required;
+                }
 
-                // Buffer as much data as we can, return EOF if we hit it.
-                if( bufferData() == -1 ) {
-                    return -1;
+                // Stream might have closed while we were buffering.
+                if( isClosed() ){
+                    throw IOException(
+                        __FILE__, __LINE__,
+                        "BufferedInputStream::bufferData - Stream is closed" );
                 }
+
+                read = this->buffer->available() >= required ? required : this->buffer->available();
+                System::arraycopy( this->buffer->getBuffer(), this->buffer->getPos(),
+                                   buffer, offset, read );
+                this->buffer->advance( read );
             }
-        }
 
-        // Return the total number of bytes read.
-        return (int)totalRead;
+            required -= read;
+
+            if( required == 0 ) {
+                return length;
+            }
+
+            if( inputStream->available() == 0 ) {
+                return length - required;
+            }
+
+            offset += read;
+        }
     }
     DECAF_CATCH_RETHROW( IOException )
     DECAF_CATCH_RETHROW( NullPointerException )
+    DECAF_CATCH_RETHROW( IndexOutOfBoundsException )
     DECAF_CATCHALL_THROW( IOException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-std::size_t BufferedInputStream::skip( std::size_t num )
+std::size_t BufferedInputStream::skip( std::size_t amount )
     throw ( IOException, lang::exceptions::UnsupportedOperationException ){
 
     try{
 
-        if( isClosed() ){
+        if( amount == 0 ) {
+            return 0;
+        }
+
+        // Use a local reference in case of unsynchronized close.
+        InputStream* inputStream = this->inputStream;
+
+        if( isClosed() || inputStream == NULL ){
             throw IOException(
                 __FILE__, __LINE__,
-                "BufferedInputStream::skip - Stream is clsoed" );
+                "BufferedInputStream::bufferData - Stream is closed" );
         }
 
-        // If there's no data left, reset to pointers to the beginning of the
-        // buffer.
-        normalizeBuffer();
+        if( buffer->available() >= amount ) {
+            buffer->advance( amount );
+            return amount;
+        }
 
-        // loop until we've skipped the desired number of bytes
-        std::size_t totalSkipped = 0;
-        while( totalSkipped < num ){
+        int read = buffer->available();
 
-            // Get the remaining bytes to copy.
-            std::size_t bytesToSkip = min( tail-head, num-totalSkipped );
+        buffer->advance( buffer->getCount() );
 
-            // Increment the head position.
-            head += bytesToSkip;
-            totalSkipped += bytesToSkip;
+        if( buffer->isMarked() ) {
 
-            // If the buffer is now empty, reset the positions to the
-            // head of the buffer.
-            normalizeBuffer();
+            if( amount <= buffer->getMarkLimit() ) {
 
-            // If we still haven't satisified the request,
-            // read more data.
-            if( totalSkipped < num ){
+                if( bufferData( inputStream ) == -1 ) {
+                    return read;
+                }
 
-                // Buffer as much data as we can.
-                bufferData();
+                if( buffer->available() >= ( amount - read ) ) {
+                    buffer->advance( amount - read );
+                    return amount;
+                }
+
+                // Couldn't get all the bytes, skip what we read
+                read += buffer->available();
+                buffer->advance( buffer->getCount() );
+
+                return read;
             }
         }
 
-        // Return the total number of bytes read.
-        return totalSkipped;
+        return read + inputStream->skip( amount - read );
     }
     DECAF_CATCH_RETHROW( IOException )
     DECAF_CATCHALL_THROW( IOException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::bufferData() throw ( IOException ){
+int BufferedInputStream::bufferData( InputStream* inputStream ) throw ( IOException ){
 
     try{
-        if( getUnusedBytes() == 0 ){
-            throw IOException(
-                __FILE__, __LINE__,
-                "BufferedInputStream::bufferData - buffer full" );
+
+        if( !buffer->isMarked() || buffer->isReadLimitExceeded() ) {
+            // Mark position not set or exceeded readlimit
+            int result = inputStream->read( buffer->getBuffer(), buffer->getBufferSize() );
+            if( result > 0 ) {
+                buffer->unmark();
+                buffer->clear();
+                buffer->advanceTail( result == -1 ? 0 : result );
+            }
+
+            return result;
         }
 
-        // Get the number of bytes currently available on the input stream
-        // that could be read without blocking.
-        std::size_t available = inputStream->available();
+        std::size_t markPos = buffer->getMarkPos();
+        std::size_t markLimit = buffer->getMarkLimit();
 
-        // Calculate the number of bytes that we can read.  Always >= 1 byte!
-        std::size_t bytesToRead = max( (std::size_t)1, min( available, getUnusedBytes() ) );
+        if( markPos == 0 && markLimit > buffer->getBufferSize() ) {
 
-        // Read the bytes from the input stream.
-        int bytesRead = inputStream->read( buffer, bufferSize, tail, bytesToRead );
-        if( bytesRead == 0 ){
-            throw IOException(
-                __FILE__, __LINE__,
-                "BufferedInputStream::read() - failed reading bytes from stream");
+            // Increase buffer size to accommodate the readlimit.
+            std::size_t newLength = buffer->getBufferSize() * 2;
+            if( newLength > markLimit ) {
+                newLength = markLimit;
+            }
+            this->buffer->resize( newLength );
+        } else if( markPos > 0 ) {
+            System::arraycopy( buffer->getBuffer(), markPos,
+                               buffer->getBuffer(), 0, buffer->getBufferSize() - markPos );
         }
 
-        // Dont add -1 to tail if we hit EOF
-        if( bytesRead == -1 ) {
-            return bytesRead;
-        }
+        // Set the new position and mark position
+        buffer->reverse( markPos );
+        buffer->setCount( 0 );
+        buffer->setMarkPos( 0 );
+
+        int bytesread = inputStream->read( buffer->getBuffer(), buffer->getBufferSize(),
+                                           buffer->getPos(), buffer->getBufferSize() - buffer->getPos() );
 
-        // Increment the tail to the new end position.
-        tail += bytesRead;
+        buffer->setCount( bytesread <= 0 ? buffer->getPos() : buffer->getPos() + bytesread );
 
-        return bytesRead;
+        return bytesread;
     }
     DECAF_CATCH_RETHROW( IOException )
     DECAF_CATCHALL_THROW( IOException )



Mime
View raw message