Author: tabish
Date: Mon Feb 22 19:02:35 2010
New Revision: 915018
URL: http://svn.apache.org/viewvc?rev=915018&view=rev
Log:
More refactoring in the I/O layer, adds better error checking, reduces cut and paste code.
Add more unit tests for the I/O layer
Attempts to get a little bit more performance out of the I/O code that does bulk moves and copies.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterInputStream.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterOutputStream.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/InputStream.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/OutputStream.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/InputStreamTest.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/InputStreamTest.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/OutputStreamTest.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/OutputStreamTest.h (with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedOutputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedOutputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/ByteArrayInputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/ByteArrayInputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/ByteArrayOutputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/ByteArrayOutputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/DataInput.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/DataInputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/DataInputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterInputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterOutputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/InputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/InputStreamReader.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/InputStreamReader.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/OutputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/OutputStreamWriter.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/OutputStreamWriter.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/Reader.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/Reader.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/Writer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/Writer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/System.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/System.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/SocketInputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/SocketInputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/SocketOutputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/SocketOutputStream.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test-benchmarks/decaf/io/ByteArrayOutputStreamBenchmark.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/BufferedInputStreamTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/BufferedInputStreamTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/BufferedOutputStreamTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/DataInputStreamTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/DataOutputStreamTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/FilterInputStreamTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/FilterInputStreamTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/FilterOutputStreamTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/FilterOutputStreamTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/ReaderTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/io/WriterTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Feb 22 19:02:35 2010
@@ -517,7 +517,11 @@
decaf/io/ByteArrayOutputStream.cpp \
decaf/io/DataInputStream.cpp \
decaf/io/DataOutputStream.cpp \
+ decaf/io/FilterInputStream.cpp \
+ decaf/io/FilterOutputStream.cpp \
+ decaf/io/InputStream.cpp \
decaf/io/InputStreamReader.cpp \
+ decaf/io/OutputStream.cpp \
decaf/io/OutputStreamWriter.cpp \
decaf/io/Reader.cpp \
decaf/io/Writer.cpp \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp Mon Feb 22 19:02:35 2010
@@ -322,7 +322,7 @@
if( length <= (size_t)this->remainingBytes ) {
// small buffer
this->remainingBytes -= (int)length;
- this->dataIn->readFully( buffer, 0, length );
+ this->dataIn->readFully( buffer, length );
return length;
} else {
// big buffer
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/Message.cpp Mon Feb 22 19:02:35 2010
@@ -284,10 +284,14 @@
stream << ", ";
stream << "BrokerOutTime = " << this->getBrokerOutTime();
- stream << " Value of ackHandler = " << ackHandler.get() << std::endl;
- stream << " Value of properties = " << this->properties.toString() << std::endl;
- stream << " Value of readOnlyBody = " << this->readOnlyBody << std::endl;
- stream << " Value of readOnlyProperties = " << this->readOnlyBody << std::endl;
+ stream << ", ";
+ stream << " ackHandler = " << ackHandler.get() << std::endl;
+ stream << ", ";
+ stream << " properties = " << this->properties.toString() << std::endl;
+ stream << ", ";
+ stream << " readOnlyBody = " << this->readOnlyBody << std::endl;
+ stream << ", ";
+ stream << " readOnlyProperties = " << this->readOnlyBody << std::endl;
stream << " }";
return stream.str();
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.cpp Mon Feb 22 19:02:35 2010
@@ -36,10 +36,10 @@
LoggingInputStream::~LoggingInputStream() {}
////////////////////////////////////////////////////////////////////////////////
-int LoggingInputStream::read() throw ( IOException ) {
+int LoggingInputStream::doReadByte() throw ( IOException ) {
try {
- unsigned char c = FilterInputStream::read();
+ unsigned char c = FilterInputStream::doReadByte();
log( &c, 1 );
return c;
}
@@ -48,7 +48,8 @@
}
////////////////////////////////////////////////////////////////////////////////
-int LoggingInputStream::read( unsigned char* buffer, std::size_t size, std::size_t offset, std::size_t length )
+int LoggingInputStream::doReadArrayBounded( unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
throw ( decaf::io::IOException,
decaf::lang::exceptions::IndexOutOfBoundsException,
decaf::lang::exceptions::NullPointerException ) {
@@ -71,13 +72,15 @@
"Given size{%d} - offset{%d} is less than length{%d}.", size, offset, length );
}
- std::size_t numRead = FilterInputStream::read( buffer, size, offset, length );
+ std::size_t numRead = FilterInputStream::doReadArrayBounded( buffer, size, offset, length );
log( buffer, numRead );
return (int)numRead;
}
AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCH_RETHROW( IndexOutOfBoundsException )
+ AMQ_CATCH_RETHROW( NullPointerException )
AMQ_CATCHALL_THROW( IOException )
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingInputStream.h Mon Feb 22 19:02:35 2010
@@ -44,26 +44,12 @@
virtual ~LoggingInputStream();
- /**
- * Reads a single byte from the buffer. Blocks until
- * data is available.
- * @return The next byte.
- * @throws IOException thrown if an error occurs.
- */
- virtual int read() throw ( decaf::io::IOException );
+ protected:
- /**
- * Reads an array of bytes from the buffer. Blocks until
- * the requested number of bytes are available.
- * @param buffer (out) the target buffer.
- * @param offset the position in the buffer to start at
- * @param bufferSize the size of the output buffer.
- * @return The number of bytes read or -1 if EOF is detected
- * @throws IOException thrown if an error occurs.
- * @throws NullPointerException if buffer is null
- */
- virtual int read( unsigned char* buffer, std::size_t size,
- std::size_t offset, std::size_t length )
+ virtual int doReadByte() throw ( decaf::io::IOException );
+
+ virtual int doReadArrayBounded( unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
throw ( decaf::io::IOException,
decaf::lang::exceptions::IndexOutOfBoundsException,
decaf::lang::exceptions::NullPointerException );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.cpp Mon Feb 22 19:02:35 2010
@@ -36,19 +36,19 @@
LoggingOutputStream::~LoggingOutputStream() {}
////////////////////////////////////////////////////////////////////////////////
-void LoggingOutputStream::write( const unsigned char c ) throw ( IOException ) {
+void LoggingOutputStream::doWriteByte( const unsigned char c ) throw ( IOException ) {
try {
log( &c, 1 );
- FilterOutputStream::write( c );
+ FilterOutputStream::doWriteByte( c );
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
-void LoggingOutputStream::write( const unsigned char* buffer, std::size_t size,
- std::size_t offset, std::size_t length )
+void LoggingOutputStream::doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
throw ( decaf::io::IOException,
decaf::lang::exceptions::NullPointerException,
decaf::lang::exceptions::IndexOutOfBoundsException ) {
@@ -73,7 +73,7 @@
log( buffer + offset, length );
- FilterOutputStream::write( buffer, size, offset, length );
+ FilterOutputStream::doWriteArrayBounded( buffer, size, offset, length );
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_RETHROW( NullPointerException )
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/LoggingOutputStream.h Mon Feb 22 19:02:35 2010
@@ -49,33 +49,12 @@
virtual ~LoggingOutputStream();
- /**
- * Writes a single byte to the output stream.
- * @param c the byte.
- * @throws IOException thrown if an error occurs.
- */
- virtual void write( unsigned char c ) throw ( decaf::io::IOException );
+ protected:
- /**
- * Writes an array of bytes to the output stream in order starting at buffer[offset]
- * and proceeding until the number of bytes specified by the length argument are
- * written or an error occurs.
- *
- * @param buffer
- * The array of bytes to write.
- * @param size
- * The size of the buffer array passed.
- * @param offset
- * The position to start writing in buffer.
- * @param length
- * The number of bytes from the buffer to be written.
- *
- * @throws IOException if an I/O error occurs.
- * @throws NullPointerException thrown if buffer is Null.
- * @throws IndexOutOfBoundsException if the offset + length > size.
- */
- virtual void write( const unsigned char* buffer, std::size_t size,
- std::size_t offset, std::size_t length )
+ virtual void doWriteByte( unsigned char c ) throw ( decaf::io::IOException );
+
+ virtual void doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
throw ( decaf::io::IOException,
decaf::lang::exceptions::NullPointerException,
decaf::lang::exceptions::IndexOutOfBoundsException );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp Mon Feb 22 19:02:35 2010
@@ -694,7 +694,7 @@
if( bs->readBoolean() ) {
int size = dataIn->readInt();
data.resize( size );
- dataIn->readFully( data );
+ dataIn->readFully( &data[0], data.size() );
}
return data;
@@ -715,7 +715,7 @@
int size = dataIn->readInt();
std::vector<unsigned char> data;
data.resize( size );
- dataIn->readFully( data );
+ dataIn->readFully( &data[0], data.size() );
return data;
}
@@ -736,7 +736,7 @@
try{
std::vector<unsigned char> data;
data.resize( size );
- dataIn->readFully( data );
+ dataIn->readFully( &data[0], data.size() );
return data;
}
AMQ_CATCH_RETHROW( IOException )
@@ -753,7 +753,7 @@
try{
std::vector<unsigned char> data;
data.resize( size );
- dataIn->readFully( data );
+ dataIn->readFully( &data[0], data.size() );
return data;
}
AMQ_CATCH_RETHROW( IOException )
@@ -815,16 +815,13 @@
try{
- int size = dataIn->readShort() + 1; // add space c++ NULL
- unsigned char* data = new unsigned char[size];
- dataIn->readFully( data, 0, size-1 );
- data[size-1] = 0; // enforce NULL
+ int size = dataIn->readShort();
+ std::vector<char> data( size );
+ dataIn->readFully( (unsigned char*)&data[0], size );
// Now build a string and copy data into it.
std::string text;
- text.resize( size );
- text.assign( (char*)data, (int)size-1 );
- delete [] data;
+ text.insert( text.begin(), data.begin(), data.end() );
return text;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/marshal/PrimitiveTypesMarshaller.cpp Mon Feb 22 19:02:35 2010
@@ -336,7 +336,7 @@
int size = dataIn.readInt();
std::vector<unsigned char> data;
data.resize( size );
- dataIn.readFully( data );
+ dataIn.readFully( &data[0], size );
value.setByteArray( data );
break;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/BooleanStream.cpp Mon Feb 22 19:02:35 2010
@@ -155,7 +155,7 @@
data.resize( arrayLimit );
// Make sure we get all the data we are expecting
- dataIn->readFully( &data[0], 0, arrayLimit );
+ dataIn->readFully( &data[0], data.size(), 0, arrayLimit );
clear();
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/utils/OpenwireStringSupport.cpp Mon Feb 22 19:02:35 2010
@@ -43,7 +43,7 @@
std::vector<unsigned char> buffer( utfLength );
std::vector<unsigned char> result( utfLength );
- dataIn.readFully( &buffer[0], 0, utfLength );
+ dataIn.readFully( &buffer[0], utfLength );
int count = 0;
int index = 0;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.cpp Mon Feb 22 19:02:35 2010
@@ -34,26 +34,15 @@
}
////////////////////////////////////////////////////////////////////////////////
-void StandardErrorOutputStream::write( unsigned char c )
+void StandardErrorOutputStream::doWriteByte( unsigned char c )
throw ( decaf::io::IOException ) {
std::cerr << c;
}
////////////////////////////////////////////////////////////////////////////////
-void StandardErrorOutputStream::write( const std::vector<unsigned char>& buffer )
- throw ( decaf::io::IOException ) {
-
- if( buffer.empty() ){
- return;
- }
-
- this->write( &buffer[0], buffer.size(), 0, buffer.size() );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StandardErrorOutputStream::write( const unsigned char* buffer, std::size_t size,
- std::size_t offset, std::size_t length )
+void StandardErrorOutputStream::doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
throw ( decaf::io::IOException,
decaf::lang::exceptions::NullPointerException,
decaf::lang::exceptions::IndexOutOfBoundsException ) {
@@ -83,3 +72,8 @@
void StandardErrorOutputStream::flush() throw ( decaf::io::IOException ){
std::cerr.flush();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void StandardErrorOutputStream::close() throw ( decaf::io::IOException ){
+ std::cerr.flush();
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardErrorOutputStream.h Mon Feb 22 19:02:35 2010
@@ -19,7 +19,6 @@
#include <decaf/util/Config.h>
#include <decaf/io/OutputStream.h>
-#include <decaf/util/concurrent/Mutex.h>
namespace decaf{
namespace internal{
@@ -31,63 +30,13 @@
* platforms or compilers that do not support <code>std::cerr</code>.
*/
class DECAF_API StandardErrorOutputStream : public decaf::io::OutputStream {
- private:
-
- /**
- * Synchronization object.
- */
- util::concurrent::Mutex mutex;
-
public:
- /**
- * Default Constructor
- */
StandardErrorOutputStream();
virtual ~StandardErrorOutputStream();
/**
- * Writes a single byte to the output stream.
- * @param c the byte.
- * @throws IOException thrown if an error occurs.
- */
- virtual void write( unsigned char c )
- throw ( decaf::io::IOException );
-
- /**
- * Writes an array of bytes to the output stream.
- * @param buffer The bytes to write.
- * @throws IOException thrown if an error occurs.
- */
- virtual void write( const std::vector<unsigned char>& buffer )
- throw ( decaf::io::IOException );
-
- /**
- * Writes an array of bytes to the output stream in order starting at buffer[offset]
- * and proceeding until the number of bytes specified by the length argument are
- * written or an error occurs.
- *
- * @param buffer
- * The array of bytes to write.
- * @param size
- * The size of the buffer array passed.
- * @param offset
- * The position to start writing in buffer.
- * @param length
- * The number of bytes from the buffer to be written.
- *
- * @throws IOException if an I/O error occurs.
- * @throws NullPointerException thrown if buffer is Null.
- * @throws IndexOutOfBoundsException if the offset + length > size.
- */
- virtual void write( const unsigned char* buffer, std::size_t size,
- std::size_t offset, std::size_t length )
- throw ( decaf::io::IOException,
- decaf::lang::exceptions::NullPointerException,
- decaf::lang::exceptions::IndexOutOfBoundsException );
-
- /**
* Invokes flush on the target output stream.
* throws decaf::io::IOException if an error occurs
*/
@@ -97,59 +46,17 @@
* Invokes close on the target output stream.
* throws IOException if an error occurs
*/
- virtual void close() throw( decaf::io::IOException ){
- this->flush();
- }
-
- public:
-
- virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
- mutex.lock();
- }
-
- virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
- return mutex.tryLock();
- }
-
- virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
- mutex.unlock();
- }
-
- virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait();
- }
+ virtual void close() throw( decaf::io::IOException );
- virtual void wait( long long millisecs )
- throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
+ protected:
- mutex.wait( millisecs );
- }
+ virtual void doWriteByte( unsigned char value ) throw ( decaf::io::IOException );
- virtual void wait( long long millisecs, int nanos )
- throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalArgumentException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait( millisecs, nanos );
- }
-
- virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException ) {
-
- mutex.notify();
- }
-
- virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException ) {
-
- mutex.notifyAll();
- }
+ virtual void doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
+ throw ( decaf::io::IOException,
+ decaf::lang::exceptions::NullPointerException,
+ decaf::lang::exceptions::IndexOutOfBoundsException );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.cpp Mon Feb 22 19:02:35 2010
@@ -40,11 +40,11 @@
////////////////////////////////////////////////////////////////////////////////
std::size_t StandardInputStream::available() const throw ( decaf::io::IOException ) {
- return 0;
+ return 1;
}
////////////////////////////////////////////////////////////////////////////////
-int StandardInputStream::read() throw ( decaf::io::IOException ) {
+int StandardInputStream::doReadByte() throw ( decaf::io::IOException ) {
if( !std::cin.good() ) {
throw decaf::io::IOException(
@@ -54,34 +54,3 @@
return std::cin.get();
}
-
-////////////////////////////////////////////////////////////////////////////////
-int StandardInputStream::read( unsigned char* buffer,
- std::size_t offset DECAF_UNUSED,
- std::size_t bufferSize DECAF_UNUSED )
- throw ( decaf::io::IOException, decaf::lang::exceptions::NullPointerException ) {
-
- if( buffer == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__,
- "ByteArrayInputStream::read - Buffer passed is Null" );
- }
-
- return 0;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-std::size_t StandardInputStream::skip( std::size_t num DECAF_UNUSED )
- throw ( decaf::io::IOException,
- decaf::lang::exceptions::UnsupportedOperationException ) {
-
- return 0;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StandardInputStream::reset() throw ( decaf::io::IOException ) {
-
- throw decaf::io::IOException(
- __FILE__, __LINE__,
- "Mark and Rest not Supported by this class." );
-}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardInputStream.h Mon Feb 22 19:02:35 2010
@@ -20,23 +20,16 @@
#include <decaf/util/Config.h>
#include <decaf/io/InputStream.h>
-#include <decaf/util/concurrent/Mutex.h>
namespace decaf {
namespace internal {
namespace io {
class DECAF_API StandardInputStream : public decaf::io::InputStream {
- private:
-
- /**
- * Synchronization object.
- */
- util::concurrent::Mutex mutex;
-
public:
StandardInputStream();
+
virtual ~StandardInputStream();
/**
@@ -45,153 +38,9 @@
*/
virtual std::size_t available() const throw ( decaf::io::IOException );
- /**
- * Reads a single byte from the buffer.
- * @return The next byte.
- * @throws IOException thrown if an error occurs.
- */
- virtual int read() throw ( decaf::io::IOException );
-
- /**
- * Reads an array of bytes from the buffer.
- *
- * @param buffer (out) the target buffer.
- * @param offset the position in the buffer to start reading from.
- * @param bufferSize the size of the output buffer.
- *
- * @return The number of bytes read.
- *
- * @throws IOException thrown if an error occurs.
- * @throws NullPointerException if buffer is null.
- */
- virtual int read( unsigned char* buffer,
- std::size_t offset,
- std::size_t bufferSize )
- throw ( decaf::io::IOException, decaf::lang::exceptions::NullPointerException );
-
- /**
- * Closes the target input stream.
- * @throws IOException thrown if an error occurs.
- */
- virtual void close() throw( decaf::io::IOException ) {}
-
- /**
- * Skips over and discards n bytes of data from this input stream. The
- * skip method may, for a variety of reasons, end up skipping over some
- * smaller number of bytes, possibly 0. This may result from any of a
- * number of conditions; reaching end of file before n bytes have been
- * skipped is only one possibility. The actual number of bytes skipped
- * is returned. If n is negative, no bytes are skipped.
- * <p>
- * The skip method of InputStream creates a byte array and then
- * repeatedly reads into it until n bytes have been read or the end
- * of the stream has been reached. Subclasses are encouraged to
- * provide a more efficient implementation of this method.
- * @param num - the number of bytes to skip
- * @returns total bytes skipped
- *
- * @throws IOException if an error occurs
- * @throws UnsupportedOperationException
- * If skip is not supported.
- */
- virtual std::size_t skip( std::size_t num )
- throw ( decaf::io::IOException,
- decaf::lang::exceptions::UnsupportedOperationException );
-
- /**
- * Marks the current position in the stream A subsequent call to the
- * reset method repositions this stream at the last marked position so
- * that subsequent reads re-read the same bytes.
- *
- * If a stream instance reports that marks are supported then the stream
- * will ensure that the same bytes can be read again after the reset method
- * is called so long the readLimit is not reached.
- * @param readLimit - max bytes read before marked position is invalid.
- */
- virtual void mark( int readLimit DECAF_UNUSED ) {}
-
- /**
- * Repositions this stream to the position at the time the mark method was
- * last called on this input stream.
- *
- * If the method markSupported returns true, then:
- * * If the method mark has not been called since the stream was created,
- * or the number of bytes read from the stream since mark was last called
- * is larger than the argument to mark at that last call, then an
- * IOException might be thrown.
- * * If such an IOException is not thrown, then the stream is reset to a
- * state such that all the bytes read since the most recent call to mark
- * (or since the start of the file, if mark has not been called) will be
- * resupplied to subsequent callers of the read method, followed by any
- * bytes that otherwise would have been the next input data as of the
- * time of the call to reset.
- * If the method markSupported returns false, then:
- * * The call to reset may throw an IOException.
- * * If an IOException is not thrown, then the stream is reset to a fixed
- * state that depends on the particular type of the input stream and how
- * it was created. The bytes that will be supplied to subsequent callers
- * of the read method depend on the particular type of the input stream.
- * @throws IOException
- */
- virtual void reset() throw ( decaf::io::IOException );
-
- /**
- * Determines if this input stream supports the mark and reset methods.
- * Whether or not mark and reset are supported is an invariant property of
- * a particular input stream instance.
- * @returns true if this stream instance supports marks
- */
- virtual bool markSupported() const{ return false; }
-
protected:
- virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
- mutex.lock();
- }
-
- virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
- return mutex.tryLock();
- }
-
- virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
- mutex.unlock();
- }
-
- virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait();
- }
-
- virtual void wait( long long millisecs )
- throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait( millisecs );
- }
-
- virtual void wait( long long millisecs, int nanos )
- throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalArgumentException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait( millisecs, nanos );
- }
-
- virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException ) {
-
- mutex.notify();
- }
-
- virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException ) {
-
- mutex.notifyAll();
- }
+ virtual int doReadByte() throw ( decaf::io::IOException );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.cpp Mon Feb 22 19:02:35 2010
@@ -34,42 +34,36 @@
}
////////////////////////////////////////////////////////////////////////////////
-void StandardOutputStream::write( unsigned char c )
+void StandardOutputStream::doWriteByte( unsigned char c )
throw ( decaf::io::IOException ) {
std::cout << c;
}
////////////////////////////////////////////////////////////////////////////////
-void StandardOutputStream::write( const std::vector<unsigned char>& buffer )
- throw ( decaf::io::IOException ) {
+void StandardOutputStream::doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
+ throw ( decaf::io::IOException,
+ decaf::lang::exceptions::NullPointerException,
+ decaf::lang::exceptions::IndexOutOfBoundsException ) {
- if( buffer.empty() ){
+ if( length == 0 ) {
return;
}
- this->write( &buffer[0], 0, buffer.size() );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StandardOutputStream::write( const unsigned char* buffer,
- std::size_t offset,
- std::size_t len )
- throw ( decaf::io::IOException, lang::exceptions::NullPointerException ) {
-
if( buffer == NULL ) {
throw lang::exceptions::NullPointerException(
__FILE__, __LINE__,
- "StandardOutputStream::write - Passed buffer is null." );
+ "StandardErrorOutputStream::write - Passed buffer is null." );
}
- if( offset > len ) {
- throw decaf::io::IOException(
+ if( ( offset + length ) > size ) {
+ throw decaf::lang::exceptions::IndexOutOfBoundsException(
__FILE__, __LINE__,
- "StandardOutputStream::write - offset passed is greater than length" );
+ "StandardErrorOutputStream::write - given offset + length is greater than buffer size.");
}
- for( std::size_t i = 0; i < len; ++i ) {
+ for( std::size_t i = 0; i < length; ++i ) {
std::cout << buffer[i+offset];
}
}
@@ -78,3 +72,8 @@
void StandardOutputStream::flush() throw ( decaf::io::IOException ){
std::cout.flush();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void StandardOutputStream::close() throw ( decaf::io::IOException ){
+ std::cout.flush();
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/io/StandardOutputStream.h Mon Feb 22 19:02:35 2010
@@ -20,53 +20,17 @@
#include <decaf/util/Config.h>
#include <decaf/io/OutputStream.h>
-#include <decaf/util/concurrent/Mutex.h>
namespace decaf {
namespace internal {
namespace io {
class DECAF_API StandardOutputStream : public decaf::io::OutputStream {
- private:
-
- /**
- * Synchronization object.
- */
- util::concurrent::Mutex mutex;
-
public:
StandardOutputStream();
- virtual ~StandardOutputStream();
- /**
- * Writes a single byte to the output stream.
- * @param c the byte.
- * @throws IOException thrown if an error occurs.
- */
- virtual void write( unsigned char c )
- throw ( decaf::io::IOException );
-
- /**
- * Writes an array of bytes to the output stream.
- * @param buffer The bytes to write.
- * @throws IOException thrown if an error occurs.
- */
- virtual void write( const std::vector<unsigned char>& buffer )
- throw ( decaf::io::IOException );
-
- /**
- * Writes an array of bytes to the output stream.
- * @param buffer The array of bytes to write.
- * @param offset, the position to start writing in buffer.
- * @param len The number of bytes from the buffer to be written.
- * @throws IOException thrown if an error occurs.
- * @throws NullPointerException if buffer is null.
- */
- virtual void write( const unsigned char* buffer,
- std::size_t offset,
- std::size_t len )
- throw ( decaf::io::IOException, lang::exceptions::NullPointerException );
+ virtual ~StandardOutputStream();
/**
* Invokes flush on the target output stream.
@@ -78,59 +42,17 @@
* Invokes close on the target output stream.
* throws IOException if an error occurs
*/
- virtual void close() throw( decaf::io::IOException ){
- this->flush();
- }
-
- public:
-
- virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
- mutex.lock();
- }
-
- virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
- return mutex.tryLock();
- }
-
- virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
- mutex.unlock();
- }
-
- virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait();
- }
-
- virtual void wait( long long millisecs )
- throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait( millisecs );
- }
-
- virtual void wait( long long millisecs, int nanos )
- throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalArgumentException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait( millisecs, nanos );
- }
-
- virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException ) {
+ virtual void close() throw( decaf::io::IOException );
- mutex.notify();
- }
+ protected:
- virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException ) {
+ virtual void doWriteByte( unsigned char value ) throw ( decaf::io::IOException );
- mutex.notifyAll();
- }
+ virtual void doWriteArrayBounded( const unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
+ throw ( decaf::io::IOException,
+ decaf::lang::exceptions::NullPointerException,
+ decaf::lang::exceptions::IndexOutOfBoundsException );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.cpp Mon Feb 22 19:02:35 2010
@@ -64,6 +64,11 @@
}
////////////////////////////////////////////////////////////////////////////////
+std::size_t BlockingByteArrayInputStream::available() const throw ( decaf::io::IOException ){
+ return std::distance( pos, buffer.end() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
void BlockingByteArrayInputStream::close() throw ( io::IOException ){
synchronized( this ){
@@ -80,9 +85,10 @@
}
////////////////////////////////////////////////////////////////////////////////
-int BlockingByteArrayInputStream::read() throw ( IOException ){
+int BlockingByteArrayInputStream::doReadByte() throw ( IOException ){
try{
+
synchronized( this ){
while( !closing ){
@@ -105,13 +111,13 @@
}
////////////////////////////////////////////////////////////////////////////////
-int BlockingByteArrayInputStream::read( unsigned char* buffer, std::size_t size,
- std::size_t offset, std::size_t length )
+int BlockingByteArrayInputStream::doReadArrayBounded( unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
throw ( decaf::io::IOException,
decaf::lang::exceptions::IndexOutOfBoundsException,
decaf::lang::exceptions::NullPointerException ) {
- if( size == 0 || length == 0 ) {
+ if( length == 0 ) {
return 0;
}
@@ -127,33 +133,40 @@
"Given size{%d} - offset{%d} is less than length{%d}.", size, offset, length );
}
- synchronized( this ){
+ try {
+
+ synchronized( this ){
- std::size_t ix = 0;
+ std::size_t ix = 0;
- for( ; ix < length && !closing; ++ix ) {
+ for( ; ix < length && !closing; ++ix ) {
- if( pos == this->buffer.end() ) {
- // Wait for more data to come in.
- wait();
+ if( pos == this->buffer.end() ) {
+ // Wait for more data to come in.
+ wait();
+ }
+
+ if( !closing && pos != this->buffer.end() ){
+ buffer[ix + offset] = *(pos);
+ ++pos;
+ }
}
- if( !closing && pos != this->buffer.end() ){
- buffer[ix + offset] = *(pos);
- ++pos;
+ if( closing ){
+ throw IOException(
+ __FILE__, __LINE__,
+ "BlockingByteArrayInputStream::read - close occurred during read" );
}
- }
- if( closing ){
- throw IOException(
- __FILE__, __LINE__,
- "BlockingByteArrayInputStream::read - close occurred during read" );
+ return (int)ix;
}
- return (int)ix;
+ return 0;
}
-
- return 0;
+ DECAF_CATCH_RETHROW( IOException )
+ DECAF_CATCH_RETHROW( NullPointerException )
+ DECAF_CATCH_RETHROW( IndexOutOfBoundsException )
+ DECAF_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BlockingByteArrayInputStream.h Mon Feb 22 19:02:35 2010
@@ -19,7 +19,6 @@
#define _DECAF_IO_BLOCKINGBYTEARRAYINPUTSTREAM_H_
#include <decaf/io/InputStream.h>
-#include <decaf/util/concurrent/Mutex.h>
#include <vector>
namespace decaf{
@@ -44,11 +43,6 @@
std::vector<unsigned char>::const_iterator pos;
/**
- * Synchronization object.
- */
- util::concurrent::Mutex mutex;
-
- /**
* Indicates that this stream is in the process of shutting
* down.
*/
@@ -68,9 +62,6 @@
BlockingByteArrayInputStream( const unsigned char* buffer,
std::size_t bufferSize );
- /**
- * Destructor
- */
virtual ~BlockingByteArrayInputStream();
/**
@@ -90,41 +81,13 @@
* @return the data available in the internal buffer.
* @throws IOException if an error occurs.
*/
- virtual std::size_t available() const throw ( IOException ){
- return std::distance( pos, buffer.end() );
- }
-
- /**
- * Reads a single byte from the buffer. This operation will
- * block until data has been added to the buffer via a call
- * to setByteArray.
- * @return the next byte.
- * @throws IOException if an error occurs.
- */
- virtual int read() throw ( IOException );
-
- /**
- * Reads an array of bytes from the buffer. If the desired amount
- * of data is not currently available, this operation
- * will block until the appropriate amount of data is available
- * in the buffer via a call to setByteArray.
- * @param buffer (out) the target buffer
- * @param offset the position in the buffer to start from.
- * @param bufferSize the size of the output buffer.
- * @return the number of bytes read. or -1 if EOF
- * @throws IOException f an error occurs.
- */
- virtual int read( unsigned char* buffer, std::size_t size,
- std::size_t offset, std::size_t length )
- throw ( decaf::io::IOException,
- decaf::lang::exceptions::IndexOutOfBoundsException,
- decaf::lang::exceptions::NullPointerException );
+ virtual std::size_t available() const throw ( decaf::io::IOException );
/**
* Closes the target input stream.
* @throws IOException if an error occurs.
*/
- virtual void close() throw ( io::IOException );
+ virtual void close() throw ( decaf::io::IOException );
/**
* Skips over and discards n bytes of data from this input stream. The
@@ -143,106 +106,18 @@
* @throws IOException if an error occurs
*/
virtual std::size_t skip( std::size_t num )
- throw ( io::IOException, lang::exceptions::UnsupportedOperationException );
-
- /**
- * Marks the current position in the stream A subsequent call to the
- * reset method repositions this stream at the last marked position so
- * that subsequent reads re-read the same bytes.
- *
- * If a stream instance reports that marks are supported then the stream
- * will ensure that the same bytes can be read again after the reset method
- * is called so long the readLimit is not reached.
- * @param readLimit - max bytes read before marked position is invalid.
- */
- virtual void mark( int readLimit DECAF_UNUSED ) {}
-
- /**
- * Repositions this stream to the position at the time the mark method was
- * last called on this input stream.
- *
- * If the method markSupported returns true, then:
- * * If the method mark has not been called since the stream was created,
- * or the number of bytes read from the stream since mark was last called
- * is larger than the argument to mark at that last call, then an
- * IOException might be thrown.
- * * If such an IOException is not thrown, then the stream is reset to a
- * state such that all the bytes read since the most recent call to mark
- * (or since the start of the file, if mark has not been called) will be
- * resupplied to subsequent callers of the read method, followed by any
- * bytes that otherwise would have been the next input data as of the
- * time of the call to reset.
- * If the method markSupported returns false, then:
- * * The call to reset may throw an IOException.
- * * If an IOException is not thrown, then the stream is reset to a fixed
- * state that depends on the particular type of the input stream and how
- * it was created. The bytes that will be supplied to subsequent callers
- * of the read method depend on the particular type of the input stream.
- * @throws IOException
- */
- virtual void reset() throw ( IOException ) {
- throw IOException(
- __FILE__, __LINE__,
- "BufferedInputStream::reset - mark no yet supported." );
- }
-
- /**
- * Determines if this input stream supports the mark and reset methods.
- * Whether or not mark and reset are supported is an invariant property of
- * a particular input stream instance.
- * @returns true if this stream instance supports marks
- */
- virtual bool markSupported() const{ return false; }
-
- public:
-
- virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
- mutex.lock();
- }
-
- virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
- return mutex.tryLock();
- }
-
- virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
- mutex.unlock();
- }
-
- virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait();
- }
-
- virtual void wait( long long millisecs )
- throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait( millisecs );
- }
-
- virtual void wait( long long millisecs, int nanos )
- throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalArgumentException,
- decaf::lang::exceptions::IllegalMonitorStateException,
- decaf::lang::exceptions::InterruptedException ) {
-
- mutex.wait( millisecs, nanos );
- }
-
- virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException ) {
+ throw ( decaf::io::IOException,
+ decaf::lang::exceptions::UnsupportedOperationException );
- mutex.notify();
- }
+ protected:
- virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
- decaf::lang::exceptions::IllegalMonitorStateException ) {
+ virtual int doReadByte() throw ( IOException );
- mutex.notifyAll();
- }
+ virtual int doReadArrayBounded( unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
+ throw ( decaf::io::IOException,
+ decaf::lang::exceptions::IndexOutOfBoundsException,
+ decaf::lang::exceptions::NullPointerException );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp?rev=915018&r1=915017&r2=915018&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp Mon Feb 22 19:02:35 2010
@@ -16,14 +16,10 @@
*/
#include "BufferedInputStream.h"
-#include <algorithm>
-#ifdef HAVE_STRING_H
-#include <string.h>
-#endif
-#ifdef HAVE_STRINGS_H
-#include <strings.h>
-#endif
+#include <decaf/lang/System.h>
+
+#include <algorithm>
using namespace std;
using namespace decaf;
@@ -32,257 +28,466 @@
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
+namespace decaf{
+namespace io{
+
+ class StreamBuffer {
+ private:
+
+ unsigned char* buffer;
+ std::size_t bufferSize;
+ std::size_t pos;
+ std::size_t count;
+ std::size_t markLimit;
+ long long markPos;
+
+ public:
+
+ StreamBuffer( std::size_t bufferSize ) {
+
+ this->buffer = new unsigned char[bufferSize];
+ this->bufferSize = bufferSize;
+ this->pos = 0;
+ this->count = 0;
+ this->markLimit = 0;
+ this->markPos = -1;
+ }
+
+ ~StreamBuffer() {
+ delete [] this->buffer;
+ }
+
+ void resize( std::size_t newSize ) {
+ unsigned char* temp = new unsigned char[newSize];
+ System::arraycopy( temp, 0, buffer, 0, count );
+ std::swap( temp, buffer );
+ delete [] temp;
+ this->bufferSize = newSize;
+ }
+
+ std::size_t getUnusedBytes() const{
+ return bufferSize - count;
+ }
+
+ std::size_t available() const {
+ return this->count - this->pos;
+ }
+
+ unsigned char next() {
+ return this->buffer[this->pos++];
+ }
+
+ void advance( std::size_t amount ) {
+ this->pos += amount;
+ }
+
+ void reverse( std::size_t amount ) {
+ this->pos -= amount;
+ }
+
+ void advanceTail( std::size_t amount ) {
+ this->count += amount;
+ }
+
+ std::size_t getBufferSize() {
+ return this->bufferSize;
+ }
+
+ unsigned char* getBuffer() {
+ return this->buffer;
+ }
+
+ std::size_t getCount() const{
+ return count;
+ }
+
+ void setCount( std::size_t count ) {
+ this->count = count;
+ }
+
+ std::size_t getPos() const{
+ return pos;
+ }
+
+ void setPos( std::size_t pos ) {
+ this->pos = pos;
+ }
+
+ void clear(){
+ pos = count = 0;
+ }
+
+ bool isEmpty() const{
+ return pos == count;
+ }
+
+ bool isMarked() const{
+ return this->markPos != -1;
+ }
+
+ void reset() {
+ this->pos = this->markPos;
+ }
+
+ void normalizeBuffer() {
+ if( isEmpty() ){
+ clear();
+ }
+ }
+
+ void mark( int readLimit ) {
+ this->markLimit = readLimit;
+ this->markPos = this->pos;
+ }
+
+ void unmark() {
+ this->markLimit = 0;
+ this->markPos = -1;
+ }
+
+ bool isReadLimitExceeded() const {
+ return pos - markPos >= markLimit;
+ }
+
+ std::size_t getMarkPos() const {
+ return (std::size_t)this->markPos;
+ }
+
+ void setMarkPos( std::size_t markPos ) {
+ this->markPos = markPos;
+ }
+
+ std::size_t getMarkLimit() const {
+ return this->markLimit;
+ }
+ };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
BufferedInputStream::BufferedInputStream( InputStream* stream, bool own )
: FilterInputStream( stream, own ) {
- // Default to a 1k buffer.
- init( 1024 );
+
+ // Default to a 8k buffer.
+ this->buffer = new StreamBuffer( 8192 );
}
////////////////////////////////////////////////////////////////////////////////
-BufferedInputStream::BufferedInputStream( InputStream* stream,
- std::size_t bufferSize,
- bool own )
- throw ( lang::exceptions::IllegalArgumentException )
-
-: FilterInputStream( stream, own ) {
+BufferedInputStream::BufferedInputStream( InputStream* stream, std::size_t bufferSize, bool own )
+ throw ( lang::exceptions::IllegalArgumentException ) : FilterInputStream( stream, own ) {
- try {
- this->init( bufferSize );
+ if( bufferSize == 0 ) {
+ throw new IllegalArgumentException(
+ __FILE__, __LINE__,
+ "BufferedInputStream::init - Size must be greater than zero");
}
- DECAF_CATCH_RETHROW( IllegalArgumentException )
- DECAF_CATCHALL_THROW( IllegalArgumentException )
+
+ this->buffer = new StreamBuffer( bufferSize );
}
////////////////////////////////////////////////////////////////////////////////
BufferedInputStream::~BufferedInputStream() {
try{
this->close();
-
- // Destroy the buffer.
- if( buffer != NULL ){
- delete [] buffer;
- buffer = NULL;
- }
+ delete this->buffer;
}
DECAF_CATCH_NOTHROW( IOException )
DECAF_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
-void BufferedInputStream::init( std::size_t bufferSize ){
+void BufferedInputStream::close() throw( IOException ) {
- if( bufferSize <= 0 ) {
- throw new IllegalArgumentException(
+ // let parent close the inputStream
+ FilterInputStream::close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::size_t BufferedInputStream::available() const throw ( IOException ) {
+
+ if( buffer == NULL || this->isClosed() ) {
+ throw IOException(
__FILE__, __LINE__,
- "BufferedInputStream::init - Size must be greater than zero");
+ "BufferedInputStream::available - Buffer was closed");
}
- this->bufferSize = bufferSize;
+ return buffer->available() + inputStream->available();
+}
- // Create the buffer and initialize the head and tail positions.
- this->buffer = new unsigned char[bufferSize];
- this->head = 0;
- this->tail = 0;
- this->markLimit = 0;
- this->markpos = -1;
+////////////////////////////////////////////////////////////////////////////////
+void BufferedInputStream::mark( int readLimit ) {
+ if( this->buffer != NULL ) {
+ this->buffer->mark( readLimit );
+ }
}
////////////////////////////////////////////////////////////////////////////////
-void BufferedInputStream::close() throw( IOException ) {
+void BufferedInputStream::reset() throw ( IOException ) {
- // let parent close the inputStream
- FilterInputStream::close();
+ if( this->isClosed() ) {
+ throw IOException(
+ __FILE__, __LINE__,
+ "BufferedInputStream::reset - This stream has been closed." );
+ }
+
+ if( !this->buffer->isMarked() ) {
+ throw IOException(
+ __FILE__, __LINE__,
+ "BufferedInputStream::reset - The mark position was invalidated." );
+ }
+
+ this->buffer->reset();
}
////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::read() throw ( IOException ){
+int BufferedInputStream::doReadByte() throw ( IOException ){
try{
- if( isClosed() ){
+ // Use a local reference in case of unsynchronized close.
+ InputStream* inputStream = this->inputStream;
+
+ if( isClosed() || inputStream == NULL ){
throw IOException(
__FILE__, __LINE__,
- "BufferedInputStream::bufferData - Stream is clsoed" );
+ "BufferedInputStream::bufferData - Stream is closed" );
}
- // If there's no data left, reset to pointers to the beginning of the
- // buffer.
- normalizeBuffer();
-
- // If we don't have any data buffered yet - read as much as
- // we can.
- if( isEmpty() ){
+ // Are there buffered bytes available? Or can we read more?
+ if( this->buffer->isEmpty() && bufferData( inputStream ) == -1 ) {
+ return -1;
+ }
- // If we hit EOF without getting any Data, then throw IOException
- if( bufferData() == -1 ) {
- return -1;
- }
+ // Stream might have closed while we were buffering.
+ if( isClosed() ){
+ throw IOException(
+ __FILE__, __LINE__,
+ "BufferedInputStream::bufferData - Stream is closed" );
}
- // Get the next character.
- char returnValue = buffer[head++];
+ if( !this->buffer->isEmpty() ) {
+ return this->buffer->next();
+ }
- return returnValue;
+ return -1;
}
DECAF_CATCH_RETHROW( IOException )
DECAF_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::read( unsigned char* buffer, std::size_t size, std::size_t offset, std::size_t length )
+int BufferedInputStream::doReadArrayBounded( unsigned char* buffer, std::size_t size,
+ std::size_t offset, std::size_t length )
throw ( decaf::io::IOException,
decaf::lang::exceptions::IndexOutOfBoundsException,
decaf::lang::exceptions::NullPointerException ) {
try{
+ // Use a local reference in case of unsynchronized close.
+ InputStream* inputStream = this->inputStream;
+
if( isClosed() ){
throw IOException(
- __FILE__, __LINE__,
- "BufferedInputStream::bufferData - Stream is clsoed" );
+ __FILE__, __LINE__, "Stream is closed" );
}
- // For zero, do nothing
- if( size == 0 || length == 0 ) {
- return 0;
+ if( buffer == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__,
+ "Buffer passed was NULL." );
}
- if( length > size - offset ) {
+ if( offset + length > size ) {
throw IndexOutOfBoundsException(
__FILE__, __LINE__,
"Given size{%d} - offset{%d} is less than length{%d}.", size, offset, length );
}
- if( size - offset < length ) {
- throw IndexOutOfBoundsException(
- __FILE__, __LINE__, "Given size{%d} - offset{%d} > length{%d}.", size, offset, length );
+ // For zero, do nothing
+ if( length == 0 ) {
+ return 0;
+ }
+
+ if( inputStream == NULL ){
+ throw IOException(
+ __FILE__, __LINE__, "Stream is closed" );
}
- // If there's no data left, reset to pointers to the beginning of the
- // buffer.
- normalizeBuffer();
+ std::size_t required = 0;
+
+ // There are bytes available in the buffer so use them up first and
+ // then we check to see if any are available on the stream, if not
+ // then just return what we had.
+ if( !this->buffer->isEmpty() ) {
+
+ std::size_t copylength =
+ this->buffer->available() >= length ? length : this->buffer->available();
+
+ System::arraycopy( this->buffer->getBuffer(), this->buffer->getPos(),
+ buffer, offset, copylength );
+ this->buffer->advance( copylength );
+
+ if( copylength == length || inputStream->available() == 0 ) {
+ return copylength;
+ }
- // If we still haven't filled the output buffer AND there is data
- // on the input stream to be read, read a buffer's worth from the stream.
- std::size_t totalRead = 0;
- while( totalRead < length ){
+ offset += copylength;
+ required = length - copylength;
+ } else {
+ required = length;
+ }
- // Get the remaining bytes to copy.
- std::size_t bytesToCopy = min( tail-head, (length-totalRead) );
+ while( true ) {
- // Copy the data to the output buffer.
- memcpy( buffer+totalRead+offset, this->buffer+head, bytesToCopy );
+ int read = 0;
- // Increment the total bytes read.
- totalRead += bytesToCopy;
+ // If we're not marked and the required size is greater than the
+ // buffer, simply read the bytes directly bypassing the buffer.
+ if( !this->buffer->isMarked() && required >= this->buffer->getBufferSize() ) {
- // Increment the head position.
- head += bytesToCopy;
+ read = inputStream->read( buffer, size, offset, required );
+ if( read == -1 ) {
+ return required == length ? -1 : length - required;
+ }
- // If the buffer is now empty, reset the positions to the
- // head of the buffer.
- normalizeBuffer();
+ } else {
- // If we still haven't satisified the request,
- // read more data.
- if( totalRead < length ){
+ if( bufferData( inputStream ) == -1 ) {
+ return required == length ? -1 : length - required;
+ }
- // Buffer as much data as we can, return EOF if we hit it.
- if( bufferData() == -1 ) {
- return -1;
+ // Stream might have closed while we were buffering.
+ if( isClosed() ){
+ throw IOException(
+ __FILE__, __LINE__,
+ "BufferedInputStream::bufferData - Stream is closed" );
}
+
+ read = this->buffer->available() >= required ? required : this->buffer->available();
+ System::arraycopy( this->buffer->getBuffer(), this->buffer->getPos(),
+ buffer, offset, read );
+ this->buffer->advance( read );
}
- }
- // Return the total number of bytes read.
- return (int)totalRead;
+ required -= read;
+
+ if( required == 0 ) {
+ return length;
+ }
+
+ if( inputStream->available() == 0 ) {
+ return length - required;
+ }
+
+ offset += read;
+ }
}
DECAF_CATCH_RETHROW( IOException )
DECAF_CATCH_RETHROW( NullPointerException )
+ DECAF_CATCH_RETHROW( IndexOutOfBoundsException )
DECAF_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
-std::size_t BufferedInputStream::skip( std::size_t num )
+std::size_t BufferedInputStream::skip( std::size_t amount )
throw ( IOException, lang::exceptions::UnsupportedOperationException ){
try{
- if( isClosed() ){
+ if( amount == 0 ) {
+ return 0;
+ }
+
+ // Use a local reference in case of unsynchronized close.
+ InputStream* inputStream = this->inputStream;
+
+ if( isClosed() || inputStream == NULL ){
throw IOException(
__FILE__, __LINE__,
- "BufferedInputStream::skip - Stream is clsoed" );
+ "BufferedInputStream::bufferData - Stream is closed" );
}
- // If there's no data left, reset to pointers to the beginning of the
- // buffer.
- normalizeBuffer();
+ if( buffer->available() >= amount ) {
+ buffer->advance( amount );
+ return amount;
+ }
- // loop until we've skipped the desired number of bytes
- std::size_t totalSkipped = 0;
- while( totalSkipped < num ){
+ int read = buffer->available();
- // Get the remaining bytes to copy.
- std::size_t bytesToSkip = min( tail-head, num-totalSkipped );
+ buffer->advance( buffer->getCount() );
- // Increment the head position.
- head += bytesToSkip;
- totalSkipped += bytesToSkip;
+ if( buffer->isMarked() ) {
- // If the buffer is now empty, reset the positions to the
- // head of the buffer.
- normalizeBuffer();
+ if( amount <= buffer->getMarkLimit() ) {
- // If we still haven't satisified the request,
- // read more data.
- if( totalSkipped < num ){
+ if( bufferData( inputStream ) == -1 ) {
+ return read;
+ }
- // Buffer as much data as we can.
- bufferData();
+ if( buffer->available() >= ( amount - read ) ) {
+ buffer->advance( amount - read );
+ return amount;
+ }
+
+ // Couldn't get all the bytes, skip what we read
+ read += buffer->available();
+ buffer->advance( buffer->getCount() );
+
+ return read;
}
}
- // Return the total number of bytes read.
- return totalSkipped;
+ return read + inputStream->skip( amount - read );
}
DECAF_CATCH_RETHROW( IOException )
DECAF_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::bufferData() throw ( IOException ){
+int BufferedInputStream::bufferData( InputStream* inputStream ) throw ( IOException ){
try{
- if( getUnusedBytes() == 0 ){
- throw IOException(
- __FILE__, __LINE__,
- "BufferedInputStream::bufferData - buffer full" );
+
+ if( !buffer->isMarked() || buffer->isReadLimitExceeded() ) {
+ // Mark position not set or exceeded readlimit
+ int result = inputStream->read( buffer->getBuffer(), buffer->getBufferSize() );
+ if( result > 0 ) {
+ buffer->unmark();
+ buffer->clear();
+ buffer->advanceTail( result == -1 ? 0 : result );
+ }
+
+ return result;
}
- // Get the number of bytes currently available on the input stream
- // that could be read without blocking.
- std::size_t available = inputStream->available();
+ std::size_t markPos = buffer->getMarkPos();
+ std::size_t markLimit = buffer->getMarkLimit();
- // Calculate the number of bytes that we can read. Always >= 1 byte!
- std::size_t bytesToRead = max( (std::size_t)1, min( available, getUnusedBytes() ) );
+ if( markPos == 0 && markLimit > buffer->getBufferSize() ) {
- // Read the bytes from the input stream.
- int bytesRead = inputStream->read( buffer, bufferSize, tail, bytesToRead );
- if( bytesRead == 0 ){
- throw IOException(
- __FILE__, __LINE__,
- "BufferedInputStream::read() - failed reading bytes from stream");
+ // Increase buffer size to accommodate the readlimit.
+ std::size_t newLength = buffer->getBufferSize() * 2;
+ if( newLength > markLimit ) {
+ newLength = markLimit;
+ }
+ this->buffer->resize( newLength );
+ } else if( markPos > 0 ) {
+ System::arraycopy( buffer->getBuffer(), markPos,
+ buffer->getBuffer(), 0, buffer->getBufferSize() - markPos );
}
- // Dont add -1 to tail if we hit EOF
- if( bytesRead == -1 ) {
- return bytesRead;
- }
+ // Set the new position and mark position
+ buffer->reverse( markPos );
+ buffer->setCount( 0 );
+ buffer->setMarkPos( 0 );
+
+ int bytesread = inputStream->read( buffer->getBuffer(), buffer->getBufferSize(),
+ buffer->getPos(), buffer->getBufferSize() - buffer->getPos() );
- // Increment the tail to the new end position.
- tail += bytesRead;
+ buffer->setCount( bytesread <= 0 ? buffer->getPos() : buffer->getPos() + bytesread );
- return bytesRead;
+ return bytesread;
}
DECAF_CATCH_RETHROW( IOException )
DECAF_CATCHALL_THROW( IOException )
|