harmony-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Ellison <t.p.elli...@gmail.com>
Subject [classlib][luni] BufferedInputStream can not be closed in another thread (HARMONY-6014)
Date Thu, 13 Nov 2008 17:17:59 GMT
Nathan Beyer (JIRA) wrote:
> Can you rework the patch to NOT depend on catching a
> NullPointerException?

Yeah, it is hacky.

> Additionally, since the 'buf' is becoming volatile,

The spec [1] says that it should be volatile, so that is goodness.

> you should use the "copy-to-local" paradigm for that field.
> Essentially, copy the field to a method local variable, then work
> against that variable. You want to reduce the number of direct
> accesses to the volatile field.

Right.  Based on the JIRA report, our goal is now to remove the
synchronized keyword from close().  So we need to make the
unsynchronized closure visible to other threads, and deal with the
effects of an unsynchronized close().

[ Allowing an unsynchronized call to close() seems like a dumb idea to
me, but I guess we have to follow the RI behavior here ... ]

If we agree that we will be using 'buf=null' to indicate that the
buffered stream is closed, then why do we also have a 'closed' boolean?
I suggest we remove that.

Like you say, we must take a local copy of the volatile 'buf' in each
method that uses it, and work with that even if after the stream is closed.

However, close() also has the side effect of closing the source 'in'
stream, so we have to be prepared for that to be null at any
(unsynchronized) point too.  The 'in' variable needs to be declared
volatile in FilterInputStream (as per spec too) -- but the problem is
that, unlike buf, simply holding a local reference to the stream doesn't
help if somebody changes it's state to closed since we can't continue to
rely on skip() and available() etc. working.

This is messy.  I don't see how we can get a consistent state on 'in'?
I'm sure a synchronized close() would have been fine ;-/

[1] http://java.sun.com/j2se/1.5.0/docs/api/java/io/BufferedInputStream.html

Regards,
Tim

-- work in progress

Index: BufferedInputStream.java
===================================================================
--- BufferedInputStream.java	(revision 711277)
+++ BufferedInputStream.java	(working copy)
@@ -32,7 +32,7 @@
     /**
      * The buffer containing the current bytes read from the target
InputStream.
      */
-    protected byte[] buf;
+    protected volatile byte[] buf;

     /**
      * The total number of bytes inside the byte array <code>buf</code>.
@@ -55,8 +55,6 @@
      */
     protected int pos;

-    private boolean closed = false;
-
     /**
      * Constructs a new <code>BufferedInputStream</codeon the InputStream
      * <code>in</code>. The default buffer size (8Kb) is allocated and all
@@ -105,6 +103,7 @@
             // K0059=Stream is closed
             throw new IOException(Msg.getString("K0059")); //$NON-NLS-1$
         }
+        // FIXME: 'in' could be invalidated by close()
         return count - pos + in.available();
     }

@@ -116,19 +115,19 @@
      *             If an error occurs attempting to close this stream.
      */
     @Override
-    public synchronized void close() throws IOException {
+    public void close() throws IOException {
+        buf = null;
         if (null != in) {
             super.close();
             in = null;
         }
-        buf = null;
-        closed = true;
     }

-    private int fillbuf() throws IOException {
+    // FIXME: 'in' could be invalidated by close()
+    private int fillbuf(byte[] buffer) throws IOException {
         if (markpos == -1 || (pos - markpos >= marklimit)) {
             /* Mark position not set or exceeded readlimit */
-            int result = in.read(buf);
+            int result = in.read(buffer);
             if (result 0) {
                 markpos = -1;
                 pos = 0;
@@ -136,22 +135,22 @@
             }
             return result;
         }
-        if (markpos == 0 && marklimit buf.length) {
-            /* Increase buffer size to accomodate the readlimit */
-            int newLength = buf.length * 2;
+        if (markpos == 0 && marklimit buffer.length) {
+            /* Increase buffer size to accommodate the readlimit */
+            int newLength = buffer.length * 2;
             if (newLength marklimit) {
                 newLength = marklimit;
             }
             byte[] newbuf = new byte[newLength];
-            System.arraycopy(buf, 0, newbuf, 0, buf.length);
-            buf = newbuf;
+            System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
+            buffer = newbuf;
         } else if (markpos 0) {
-            System.arraycopy(buf, markpos, buf, 0, buf.length - markpos);
+            System.arraycopy(buffer, markpos, buffer, 0, buffer.length
- markpos);
         }
         /* Set the new position and mark position */
         pos -= markpos;
         count = markpos = 0;
-        int bytesread = in.read(buf, pos, buf.length - pos);
+        int bytesread = in.read(buffer, pos, buffer.length - pos);
         count = bytesread <= 0 ? pos : pos + bytesread;
         return bytesread;
     }
@@ -200,19 +199,20 @@
      */
     @Override
     public synchronized int read() throws IOException {
-        if (in == null) {
+        byte[] localBuf = buf;    // buf may be invalidated by an
unsynchronized close()
+        if (localBuf == null) {
             // K0059=Stream is closed
             throw new IOException(Msg.getString("K0059")); //$NON-NLS-1$
         }

         /* Are there buffered bytes available? */
-        if (pos >= count && fillbuf() == -1) {
+        if (pos >= count && fillbuf(localBuf) == -1) {
             return -1; /* no, fill buffer */
         }

         /* Did filling the buffer fail with -1 (EOF)? */
         if (count - pos 0) {
-            return buf[pos++] & 0xFF;
+            return localBuf[pos++] & 0xFF;
         }
         return -1;
     }
@@ -239,10 +239,12 @@
      *             If the stream is already closed or another IOException
      *             occurs.
      */
+    // FIXME: 'in' may be invalidated by close()
     @Override
     public synchronized int read(byte[] buffer, int offset, int length)
             throws IOException {
-        if (closed) {
+        byte[] localBuf = buf;    // buf may be invalidated by an
unsynchronized close()
+        if (localBuf == null) {
             // K0059=Stream is closed
             throw new IOException(Msg.getString("K0059")); //$NON-NLS-1$
         }
@@ -253,15 +255,12 @@
         if (length == 0) {
             return 0;
         }
-        if (null == buf) {
-            throw new IOException(Msg.getString("K0059")); //$NON-NLS-1$
-        }

         int required;
         if (pos < count) {
             /* There are bytes available in the buffer. */
             int copylength = count - pos >= length ? length : count - pos;
-            System.arraycopy(buf, pos, buffer, offset, copylength);
+            System.arraycopy(localBuf, pos, buffer, offset, copylength);
             pos += copylength;
             if (copylength == length || in.available() == 0) {
                 return copylength;
@@ -278,17 +277,17 @@
              * If we're not marked and the required size is greater
than the
              * buffer, simply read the bytes directly bypassing the buffer.
              */
-            if (markpos == -1 && required >= buf.length) {
+            if (markpos == -1 && required >= localBuf.length) {
                 read = in.read(buffer, offset, required);
                 if (read == -1) {
                     return required == length ? -1 : length - required;
                 }
             } else {
-                if (fillbuf() == -1) {
+                if (fillbuf(localBuf) == -1) {
                     return required == length ? -1 : length - required;
                 }
                 read = count - pos >= required ? required : count - pos;
-                System.arraycopy(buf, pos, buffer, offset, read);
+                System.arraycopy(localBuf, pos, buffer, offset, read);
                 pos += read;
             }
             required -= read;
@@ -315,7 +314,7 @@

     @Override
     public synchronized void reset() throws IOException {
-        if (closed) {
+        if (buf == null) {
             // K0059=Stream is closed
             throw new IOException(Msg.getString("K0059")); //$NON-NLS-1$	
         }
@@ -341,7 +340,8 @@
      */
     @Override
     public synchronized long skip(long amount) throws IOException {
-        if (null == in) {
+        byte[] localBuf = buf;    // buf may be invalidated by an
unsynchronized close()
+        if (localBuf == null) {
             // K0059=Stream is closed
             throw new IOException(Msg.getString("K0059")); //$NON-NLS-1$
         }
@@ -358,7 +358,7 @@

         if (markpos != -1) {
             if (amount <= marklimit) {
-                if (fillbuf() == -1) {
+                if (fillbuf(localBuf) == -1) {
                     return read;
                 }
                 if (count - pos >= amount - read) {
@@ -372,6 +372,7 @@
             }
             markpos = -1;
         }
+        // FIXME: 'in' may be invalidated by close()
         return read + in.skip(amount - read);
     }
 }

Mime
View raw message