activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r915855 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io: BufferedInputStream.cpp BufferedInputStream.h FilterInputStream.cpp FilterOutputStream.cpp
Date Wed, 24 Feb 2010 16:25:48 GMT
Author: tabish
Date: Wed Feb 24 16:25:48 2010
New Revision: 915855

URL: http://svn.apache.org/viewvc?rev=915855&view=rev
Log:
Fixes to make the buffered stream more Thread safe when closed asynchronously while in blocked
reads.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterOutputStream.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp?rev=915855&r1=915854&r2=915855&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp Wed
Feb 24 16:25:48 2010
@@ -169,7 +169,7 @@
 : FilterInputStream( stream, own ) {
 
     // Default to a 8k buffer.
-    this->buffer = new StreamBuffer( 8192 );
+    this->buffer.reset( new StreamBuffer( 8192 ) );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -182,14 +182,13 @@
             "BufferedInputStream::init - Size must be greater than zero");
     }
 
-    this->buffer = new StreamBuffer( bufferSize );
+    this->buffer.reset( new StreamBuffer( bufferSize ) );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 BufferedInputStream::~BufferedInputStream() {
     try{
         this->close();
-        delete this->buffer;
     }
     DECAF_CATCH_NOTHROW( IOException )
     DECAF_CATCHALL_NOTHROW()
@@ -200,6 +199,10 @@
 
     // let parent close the inputStream
     FilterInputStream::close();
+
+    // Free the class reference, read operation may still be
+    // holding onto the buffer while blocked.
+    this->buffer.release();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -224,7 +227,7 @@
 ////////////////////////////////////////////////////////////////////////////////
 void BufferedInputStream::reset() throw ( IOException ) {
 
-    if( this->isClosed() ) {
+    if( this->buffer == NULL ) {
         throw IOException(
             __FILE__, __LINE__,
             "BufferedInputStream::reset - This stream has been closed." );
@@ -246,27 +249,28 @@
 
         // Use a local reference in case of unsynchronized close.
         InputStream* inputStream = this->inputStream;
+        Pointer<StreamBuffer> buffer = this->buffer;
 
-        if( isClosed() || inputStream == NULL ){
+        if( isClosed() || buffer == NULL ){
             throw IOException(
                 __FILE__, __LINE__,
                 "BufferedInputStream::bufferData - Stream is closed" );
         }
 
         // Are there buffered bytes available?  Or can we read more?
-        if( this->buffer->isEmpty() && bufferData( inputStream ) == -1 ) {
+        if( buffer->isEmpty() && bufferData( inputStream, buffer ) == -1 ) {
             return -1;
         }
 
         // Stream might have closed while we were buffering.
-        if( isClosed() ){
+        if( isClosed() ) {
             throw IOException(
                 __FILE__, __LINE__,
                 "BufferedInputStream::bufferData - Stream is closed" );
         }
 
-        if( !this->buffer->isEmpty() ) {
-            return this->buffer->next();
+        if( !buffer->isEmpty() ) {
+            return buffer->next();
         }
 
         return -1;
@@ -285,9 +289,9 @@
     try{
 
         // Use a local reference in case of unsynchronized close.
-        InputStream* inputStream = this->inputStream;
+        Pointer<StreamBuffer> streamBuffer = this->buffer;
 
-        if( isClosed() ){
+        if( streamBuffer == NULL ){
             throw IOException(
                 __FILE__, __LINE__, "Stream is closed" );
         }
@@ -309,6 +313,9 @@
             return 0;
         }
 
+        // Use a local reference in case of unsynchronized close.
+        InputStream* inputStream = this->inputStream;
+
         if( inputStream == NULL ){
             throw IOException(
                 __FILE__, __LINE__, "Stream is closed" );
@@ -319,14 +326,14 @@
         // There are bytes available in the buffer so use them up first and
         // then we check to see if any are available on the stream, if not
         // then just return what we had.
-        if( !this->buffer->isEmpty() ) {
+        if( !streamBuffer->isEmpty() ) {
 
             std::size_t copylength =
-                this->buffer->available() >= length ? length : this->buffer->available();
+                streamBuffer->available() >= length ? length : streamBuffer->available();
 
-            System::arraycopy( this->buffer->getBuffer(), this->buffer->getPos(),
+            System::arraycopy( streamBuffer->getBuffer(), streamBuffer->getPos(),
                                buffer, offset, copylength );
-            this->buffer->advance( copylength );
+            streamBuffer->advance( copylength );
 
             if( copylength == length || inputStream->available() == 0 ) {
                 return copylength;
@@ -353,21 +360,21 @@
 
             } else {
 
-                if( bufferData( inputStream ) == -1 ) {
+                if( bufferData( inputStream, streamBuffer ) == -1 ) {
                     return required == length ? -1 : length - required;
                 }
 
                 // Stream might have closed while we were buffering.
-                if( isClosed() ){
+                if( isClosed() || this->buffer == NULL ){
                     throw IOException(
                         __FILE__, __LINE__,
                         "BufferedInputStream::bufferData - Stream is closed" );
                 }
 
-                read = this->buffer->available() >= required ? required : this->buffer->available();
-                System::arraycopy( this->buffer->getBuffer(), this->buffer->getPos(),
+                read = streamBuffer->available() >= required ? required : streamBuffer->available();
+                System::arraycopy( streamBuffer->getBuffer(), streamBuffer->getPos(),
                                    buffer, offset, read );
-                this->buffer->advance( read );
+                streamBuffer->advance( read );
             }
 
             required -= read;
@@ -401,38 +408,39 @@
 
         // Use a local reference in case of unsynchronized close.
         InputStream* inputStream = this->inputStream;
+        Pointer<StreamBuffer> streamBuffer = this->buffer;
 
-        if( isClosed() || inputStream == NULL ){
+        if( isClosed() || streamBuffer == NULL ){
             throw IOException(
                 __FILE__, __LINE__,
-                "BufferedInputStream::bufferData - Stream is closed" );
+                "BufferedInputStream::skip - Stream is closed" );
         }
 
-        if( buffer->available() >= amount ) {
-            buffer->advance( amount );
+        if( streamBuffer->available() >= amount ) {
+            streamBuffer->advance( amount );
             return amount;
         }
 
-        int read = buffer->available();
+        int read = streamBuffer->available();
 
-        buffer->advance( buffer->getCount() );
+        streamBuffer->advance( streamBuffer->getCount() );
 
-        if( buffer->isMarked() ) {
+        if( streamBuffer->isMarked() ) {
 
-            if( amount <= buffer->getMarkLimit() ) {
+            if( amount <= streamBuffer->getMarkLimit() ) {
 
-                if( bufferData( inputStream ) == -1 ) {
+                if( bufferData( inputStream, streamBuffer ) == -1 ) {
                     return read;
                 }
 
-                if( buffer->available() >= ( amount - read ) ) {
-                    buffer->advance( amount - read );
+                if( streamBuffer->available() >= ( amount - read ) ) {
+                    streamBuffer->advance( amount - read );
                     return amount;
                 }
 
                 // Couldn't get all the bytes, skip what we read
-                read += buffer->available();
-                buffer->advance( buffer->getCount() );
+                read += streamBuffer->available();
+                streamBuffer->advance( streamBuffer->getCount() );
 
                 return read;
             }
@@ -445,7 +453,8 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::bufferData( InputStream* inputStream ) throw ( IOException ){
+int BufferedInputStream::bufferData( InputStream* inputStream, Pointer<StreamBuffer>&
buffer )
+    throw ( decaf::io::IOException ){
 
     try{
 
@@ -471,7 +480,7 @@
             if( newLength > markLimit ) {
                 newLength = markLimit;
             }
-            this->buffer->resize( newLength );
+            buffer->resize( newLength );
         } else if( markPos > 0 ) {
             System::arraycopy( buffer->getBuffer(), markPos,
                                buffer->getBuffer(), 0, buffer->getBufferSize() - markPos
);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h?rev=915855&r1=915854&r2=915855&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h Wed Feb
24 16:25:48 2010
@@ -21,6 +21,7 @@
 #include <decaf/util/Config.h>
 #include <decaf/io/FilterInputStream.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
+#include <decaf/lang/Pointer.h>
 
 namespace decaf{
 namespace io{
@@ -36,7 +37,10 @@
     class DECAF_API BufferedInputStream : public FilterInputStream {
     private:
 
-        StreamBuffer* buffer;
+        // Internal data buffer, uses a smart pointer so that async close
+        // operations allow read methods fail gracefully instead of segfaulting
+        // on access to invalid memory.
+        decaf::lang::Pointer<StreamBuffer> buffer;
 
     public:
 
@@ -161,7 +165,8 @@
 
     private:
 
-        int bufferData( InputStream* stream ) throw ( decaf::io::IOException );
+        int bufferData( InputStream* stream, decaf::lang::Pointer<StreamBuffer>&
buffer )
+            throw ( decaf::io::IOException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterInputStream.cpp?rev=915855&r1=915854&r2=915855&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterInputStream.cpp Wed Feb
24 16:25:48 2010
@@ -179,5 +179,5 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FilterInputStream::isClosed() const {
-    return this->closed;
+    return this->closed || this->inputStream == NULL;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterOutputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterOutputStream.cpp?rev=915855&r1=915854&r2=915855&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterOutputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/FilterOutputStream.cpp Wed
Feb 24 16:25:48 2010
@@ -29,7 +29,7 @@
 FilterOutputStream::FilterOutputStream( OutputStream* outputStream, bool own ){
     this->outputStream = outputStream;
     this->own = own;
-    this->closed = false;
+    this->closed = outputStream == NULL ? true : false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////



Mime
View raw message