db-derby-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From krist...@apache.org
Subject svn commit: r724294 - in /db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc: ClobUpdatableReader.java EmbedClob.java StoreStreamClob.java TemporaryClob.java UTF8Reader.java
Date Mon, 08 Dec 2008 09:08:28 GMT
Author: kristwaa
Date: Mon Dec  8 01:08:27 2008
New Revision: 724294

URL: http://svn.apache.org/viewvc?rev=724294&view=rev
Log:
DERBY-3934 (partial): Improve performance of reading modified Clobs.
The following files are touched (all in derby.impl.jdbc):
*** EmbedClob.
 Updated call to ClobUpdatableReader. The change of the position argument is
 intentional.

*** TemporaryClob
 Replaced the ClobUpdatableReader returned by getReader with a UTF8Reader.
 Internal handling of TemporaryClob should deal with changing contents
 specifically, or create a ClobUpdatableReader where required.
 Note also the use of the new CharacterStreamDescriptor class. This piece of
 code will probably be changed later on, when there is more information about
 the stream available. For instance, caching byte/char positions allows to skip
 directly to the byte position through the underlying file API. This way, we
 don't have to decode all the raw bytes to skip the correct number of chars.

*** ClobUpdatableReader
 More or less rewritten. It now uses the new methods exposed by InternalClob to
 detect changes in the underlying Clob content. Note that this class doesn't
 handle repositioning, only detection of changes and forwarding of read/skip
 calls.
 Note the lazy initialization of the underlying reader.

 WARNING: There is one thing missing, which is proper synchronization. Access to
 store will be synchronized in other locations, but this class is not thread
 safe. I haven't decided yet whether to synchronize on the reader object or the
 root connection. I think the latter is the best choice. Does anyone know
 anything about the cost of taking locks on the same object multiple times?

*** StoreStreamClob
 Replaced old UTF8Reader constructor with the new one. Again, this code needs
 to be updated when more information about the stream is available. This is to
 allow UTF8Reader to perform better.

*** UTF8Reader
 Added a new constructor, using the new CharacterStreamDescriptor class.
 Removed one constructor.
 Retrofitted the second old constructor to use CharacterStreamDescriptor. This
 will be removed when the calling code has been updated.
 The old method calculating the buffer size will also be removed.
 Stopped referencing PositionedStoreStream, using PositionedStream interface
 instead. This allows the positioning logic to be used for both store streams
 and LOBInputStream streams.
 The reader has been prepared to be able to deal with multiple data offsets,
 i.e. handling several store stream formats. For instance, the current
 implementations has an offset of two bytes, where as the planned new one will
 have an offset of at least five bytes. LOBInputStream has an offset of zero
 bytes (no header information).
 From now on, position aware streams are not closed as early as before, because
 we might have go backwards in the stream. Streams that can only move forwards
 are closed as soon as possible (as before). 

Note that this patch doesn't fix the most serious performance issue. This will
be done in a follow-up patch by implementing getInternalReader in TemporaryClob.

Patch file: derby-3934-3a-clobupdreader_utf8reader.diff


Modified:
    db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/ClobUpdatableReader.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/EmbedClob.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/StoreStreamClob.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/TemporaryClob.java
    db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/UTF8Reader.java

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/ClobUpdatableReader.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/ClobUpdatableReader.java?rev=724294&r1=724293&r2=724294&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/ClobUpdatableReader.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/ClobUpdatableReader.java Mon
Dec  8 01:08:27 2008
@@ -22,209 +22,194 @@
  */
 package org.apache.derby.impl.jdbc;
 
-import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.Reader;
 import java.sql.SQLException;
-import org.apache.derby.iapi.reference.SQLState;
-import org.apache.derby.iapi.services.i18n.MessageService;
 import org.apache.derby.iapi.services.sanity.SanityManager;
 
 /**
- * <code>ClobUpdatableReader</code> is used to create a <code>Reader</code>
- * over a <code>LOBInputStream</code>.
+ * {@code ClobUpdatableReader} is used to create a {@code Reader} capable of
+ * detecting changes to the underlying source.
  * <p>
  * This class is aware that the underlying stream can be modified and
  * reinitializes itself if it detects any change in the stream. This
  * invalidates the cache so the changes are reflected immediately.
- * 
- * @see LOBInputStream
+ * <p>
+ * The task of this class is to detect changes in the underlying Clob.
+ * Repositioning is handled by other classes.
  */
 final class ClobUpdatableReader extends Reader {
-    
+ 
     /** Reader accessing the Clob data and doing the work. */
     private Reader streamReader;
-    /** Character position of this reader. */
+    /** Character position of this reader (1-based). */
     private long pos;
-    /** Underlying stream of byte data. */
-    private InputStream stream = null;
-    /** Connection object used to obtain synchronization-object. */
-    private ConnectionChild conChild;
-    /** flag to indicate if its associated with materialized clob */
-    private boolean materialized;
-    /** clob object this object is associated */
+    /** The last update count seen on the underlying Clob. */
+    private long lastUpdateCount = -1;
+    /**
+     * The Clob object we are reading from.
+     * <p>
+     * Note that even though the Clob itself is final, the internal
+     * representation of the content may change. The reference to the Clob is
+     * needed to get a hold of the new internal representation if it is changed.
+     *
+     * @see #iClob
+     */
     private final EmbedClob clob;
     /**
+     * The current internal representation of the Clob content.
+     * <p>
+     * If the user starts out with a read-only Clob and then modifies it, the
+     * internal representation will change.
+     */
+    private InternalClob iClob;
+    /**
      * Position in Clob where to stop reading unless EOF is reached first.
      */
     private final long maxPos;
-    
-    
+    /** Tells if this reader has been closed. */
+    private volatile boolean closed = false;
+
     /**
-     * Constructs a <code>Reader</code> over a <code>LOBInputStream</code>.
-     * @param stream underlying stream of byte data
-     * @param conChild a connection object used to obtain synchronization-object
-     * @throws IOException
-     */
-    ClobUpdatableReader (LOBInputStream stream, ConnectionChild conChild)
-                                                        throws IOException {
-        clob = null;
-        materialized = true;
-        this.conChild = conChild;
-        this.stream = stream;
-        //The subset of the Clob has not been requested. 
-        //Hence set maxPos to infinity (or as close as we get).
-        this.maxPos = Long.MAX_VALUE;
-
-        init (stream, 0);
+     * Creates an updatable reader configured with initial position set to the
+     * first character in the Clob and with no imposed length limit.
+     *
+     * @param clob source data
+     * @throws IOException if obtaining the underlying reader fails
+     * @throws SQLException if obtaining the underlying reader fails
+     */
+    public ClobUpdatableReader(EmbedClob clob)
+            throws IOException, SQLException {
+        this(clob, 1L, Long.MAX_VALUE);
     }
 
     /**
-     * Constructs a <code>Reader</code> over a <code>LOBInputStream</code>.
-     * @param clob EmbedClob this Reader is associated to.
-     * @throws IOException
-     * @throws SQLException
-     */
-    ClobUpdatableReader (EmbedClob clob) throws IOException, SQLException {
-        // A subset of the Clob has not been requested.
-        // Hence set length to infinity (or as close as we get).
-        this(clob, 0L, Long.MAX_VALUE);
-    }
-    
-    /**
-     * Construct an <code>ClobUpdatableReader<code> using the 
-     * <code>EmbedClob</code> received as parameter. The initial
-     * position in the stream is set to <code>pos</code> and the
-     * stream is restricted to a length of <code>len</code>.
-     * 
-     * @param clob EmbedClob this stream is associated with.
-     * @param pos initial position. The position starts from 0.
-     * @param len The length to which the underlying <code>InputStream</code>
-     *            has to be restricted.
-     * @throws IOException
-     * @throws SQLException
-     */
-    ClobUpdatableReader (EmbedClob clob, long pos, long len) 
-    throws IOException, SQLException {
+     * Creates an updatable reader configured with the specified initial
+     * position and with an imposed length limit.
+     *
+     * @param clob source data
+     * @param initialPos the first character that will be read
+     * @param length the maximum number of characters that will read
+     * @throws IOException if obtaining the underlying reader fails
+     * @throws SQLException if obtaining the underlying reader fails
+     */
+    public ClobUpdatableReader(EmbedClob clob, long initialPos, long length)
+            throws IOException, SQLException {
+        if (SanityManager.DEBUG) {
+            SanityManager.ASSERT(initialPos > 0);
+            SanityManager.ASSERT(length > 0);
+        }
         this.clob = clob;
-        this.conChild = clob;
-        this.maxPos = pos + len;
+        this.iClob = clob.getInternalClob();
+        this.pos = initialPos;
+        // Temporary computation due to possible overflow.
+        long tmpMaxPos = initialPos + length; // May overflow
+        if (tmpMaxPos < length || tmpMaxPos < initialPos) {
+            tmpMaxPos = Long.MAX_VALUE;
+        }
+        this.maxPos = tmpMaxPos;
+    }
 
-        InternalClob internalClob = clob.getInternalClob();
-        materialized = internalClob.isWritable();        
-        if (materialized) {
-            this.stream = internalClob.getRawByteStream();
-            // Position the stream on pos using the init method.
-            init ((LOBInputStream)stream, pos);
-        } else {
-            if (SanityManager.DEBUG) {
-                SanityManager.ASSERT(internalClob instanceof StoreStreamClob,
-                        "Wrong type of internal clob representation: " +
-                        internalClob.toString());
-            }
-            // Since this representation is read-only, the stream never has to
-            // update itself, until the Clob representation itself has been
-            // changed. That even will be detected by {@link #updateIfRequired}.
-            this.streamReader = internalClob.getReader(pos + 1);
-            this.pos = pos;
-        }
-    }
-        
-    /**
-     * Reads chars into the cbuf char array. Changes made in uderlying storage 
-     * will be reflected immidiatly from the corrent position.
-     * @param cbuf buffer to read into
-     * @param off offet of the cbuf array to start writing read chars
-     * @param len number of chars to be read
-     * @return number of bytes read
-     * @throws IOException
-     */
-    public int read(char[] cbuf, int off, int len) throws IOException {        
-        updateIfRequired();
-        
-        //If the stream has exceeded maxPos the read should return -1
-        //signifying end of stream.
+    public int read() throws IOException {
+        if (closed) {
+            throw new IOException("Reader closed");
+        }
         if (pos >= maxPos) {
             return -1;
         }
+        updateReaderIfRequired();
+        // Adjust length if required, read data and update position.
+        int retVal = this.streamReader.read();
+        if (retVal > 0) {
+            this.pos++;
+        }
+        return retVal;
+    }
 
-        int actualLength = (int) Math.min(len, maxPos - pos);
-        int ret = streamReader.read (cbuf, off, actualLength);
-        if (ret >= 0) {
-            pos += ret;
+    public int read(char[] cbuf, int off, int len) throws IOException {
+        if (closed) {
+            throw new IOException("Reader closed");
         }
-        return ret;
+        if (pos >= maxPos) {
+            return -1;
+        }
+        updateReaderIfRequired();
+        // Adjust length if required, read data and update position.
+        int adjustedLen = (int)Math.min(len, maxPos - pos);
+        int readCount = this.streamReader.read(cbuf, off, adjustedLen);
+        if (readCount > 0) {
+            this.pos += readCount;
+        }
+        return readCount;
+    }
+
+    public long skip(long len) throws IOException {
+        if (closed) {
+            throw new IOException("Reader closed");
+        }
+        if (pos >= maxPos) {
+            return 0;
+        }
+        updateReaderIfRequired();
+        // Adjust length if required, skip data and update position.
+        long adjustedLen = Math.min(len, maxPos - pos);
+        long skipped = this.streamReader.skip(adjustedLen);
+        if (skipped > 0) {
+            this.pos += skipped;
+        }
+        return skipped;
     }
 
     /**
-     * Closes the reader.
-     * @throws IOException
+     * Closes this reader.
+     * <p>
+     * An {@code IOException} will be thrown if any of the read or skip methods
+     * are called after the reader has been closed.
+     *
+     * @throws IOException if an error occurs while closing
      */
     public void close() throws IOException {
-        streamReader.close();
-    }
-    
-    /**
-     * Initializes the streamReader and skips to the given position.
-     * @param skip number of characters to skip to reach initial position
-     * @throws IOException if a streaming error occurs
-     */
-    private void init(LOBInputStream stream, long skip) 
-                                                    throws IOException {
-        streamReader = new UTF8Reader (stream, 0, stream.length (), 
-                                        conChild, 
-                                conChild.getConnectionSynchronization());
-        long remainToSkip = skip;
-        while (remainToSkip > 0) {
-            long skipBy = streamReader.skip(remainToSkip);
-            if (skipBy == 0) {
-                if (streamReader.read() == -1) {
-                    throw new EOFException (
-                                 MessageService.getCompleteMessage (
-                                 SQLState.STREAM_EOF, new Object [0]));
-                }
-                skipBy = 1;
+        if (!closed) {
+            closed = true;
+            // Can be null if the stream is created and closed immediately.
+            if (this.streamReader != null) {
+                this.streamReader.close();
             }
-            remainToSkip -= skipBy;
         }
-        pos = skip;
-    }    
+    }
 
     /**
-     * Updates the stream if underlying clob is modified since
-     * this reader was created. 
-     * If the stream is associated with a materialized clob, it 
-     * checks if the underlying clob is updated since last read and 
-     * updates itself if it is. If the stream is associated with 
-     * non materialized clob and clob is materialized since last read it 
-     * fetches the stream again and sets the position to current position.
-     * @throws IOException
-     */
-    private void updateIfRequired () throws IOException {
-        if (materialized) {
-            LOBInputStream lobStream = (LOBInputStream) stream;
-            if (lobStream.isObsolete()) {
-                lobStream.reInitialize();
-                init (lobStream, pos);
+     * Updates the reader if the underlying data has been modified.
+     * <p>
+     * There are two cases to deal with:
+     * <ol> <li>The underlying data of the internal Clob representation has been
+     *          modified.</li>
+     *      <li>The internal Clob representation has changed.</li>
+     * </ol>
+     * The latter case happens when a read-only Clob, represented as a stream
+     * into store, is modified by the user and a new temporary internal
+     * representation is created.
+     *
+     * @throws IOException if verifying or updating the reader fails
+     */
+    private void updateReaderIfRequired() throws IOException {
+        // Case two as described above; changed representation.
+        if (iClob.isReleased()) {
+            iClob = this.clob.getInternalClob();
+            lastUpdateCount = -1;
+            // Check again. If both are closed, the Clob itself is closed.
+            if (iClob.isReleased()) {
+                close();
+                return;
             }
         }
-        else {
-            //clob won't be null if the stream wasn't materialized
-            //but still try to be safe
-            if (SanityManager.DEBUG) {
-                SanityManager.ASSERT (!(clob == null), 
-                        "Internal error while updating stream");
-            }
-            if (clob.getInternalClob().isWritable()) {
-                try {
-                    stream = clob.getInternalClob().getRawByteStream();
-                }
-                catch (SQLException e) {
-                    throw Util.newIOException(e);
-                }
-                init ((LOBInputStream) stream, pos);
-                materialized = true;
+        // Case one as described above; content has been modified.
+        if (lastUpdateCount != iClob.getUpdateCount()) {
+            lastUpdateCount = iClob.getUpdateCount();
+            try {
+                this.streamReader = iClob.getReader(pos);
+            } catch (SQLException sqle) {
+                throw new IOException(sqle.getMessage());
             }
         }
     }

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/EmbedClob.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/EmbedClob.java?rev=724294&r1=724293&r2=724294&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/EmbedClob.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/EmbedClob.java Mon Dec  8 01:08:27
2008
@@ -718,9 +718,7 @@
         }
         
         try {
-            return new ClobUpdatableReader(this,
-                                            pos-1,
-                                            length);
+            return new ClobUpdatableReader(this, pos, length);
         } catch (IOException ioe) {
             throw Util.setStreamFailure(ioe);
         } 

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/StoreStreamClob.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/StoreStreamClob.java?rev=724294&r1=724293&r2=724294&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/StoreStreamClob.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/StoreStreamClob.java Mon Dec
 8 01:08:27 2008
@@ -32,6 +32,7 @@
 
 import java.sql.SQLException;
 import org.apache.derby.iapi.error.StandardException;
+import org.apache.derby.iapi.jdbc.CharacterStreamDescriptor;
 import org.apache.derby.iapi.reference.SQLState;
 import org.apache.derby.iapi.types.Resetable;
 import org.apache.derby.iapi.types.TypeId;
@@ -212,9 +213,17 @@
         } catch (StandardException se) {
             throw Util.generateCsSQLException(se);
         }
-        Reader reader = new UTF8Reader(this.positionedStoreStream,
-                                TypeId.CLOB_MAXWIDTH, this.conChild,
-                                this.synchronizationObject);
+        // Describe the stream to allow the reader to configure itself.
+        CharacterStreamDescriptor csd =
+                new CharacterStreamDescriptor.Builder().
+                stream(positionedStoreStream).bufferable(false).
+                positionAware(true).dataOffset(2L). // TODO
+                curCharPos(CharacterStreamDescriptor.BEFORE_FIRST).
+                maxCharLength(TypeId.CLOB_MAXWIDTH).
+                charLength(cachedCharLength). // 0 means unknown.
+                build();
+        Reader reader = new UTF8Reader(
+                csd, this.conChild, this.synchronizationObject);
         long leftToSkip = pos -1;
         long skipped;
         while (leftToSkip > 0) {
@@ -242,8 +251,24 @@
     public Reader getInternalReader(long characterPosition)
             throws IOException, SQLException {
         if (this.internalReader == null) {
-            this.internalReader = new UTF8Reader(positionedStoreStream,
-                    TypeId.CLOB_MAXWIDTH, conChild, synchronizationObject);
+            if (positionedStoreStream.getPosition() != 0) {
+                try {
+                    positionedStoreStream.resetStream();
+                } catch (StandardException se) {
+                    throw Util.generateCsSQLException(se);
+                }
+            }
+            // Describe the stream to allow the reader to configure itself.
+            CharacterStreamDescriptor csd =
+                    new CharacterStreamDescriptor.Builder().
+                    stream(positionedStoreStream).bufferable(false).
+                    positionAware(true).dataOffset(2L). // TODO: Fix offset.
+                    curCharPos(CharacterStreamDescriptor.BEFORE_FIRST).
+                    maxCharLength(TypeId.CLOB_MAXWIDTH).
+                    charLength(cachedCharLength). // 0 means unknown.
+                    build();
+            this.internalReader =
+                    new UTF8Reader(csd, conChild, synchronizationObject);
             this.unclosableInternalReader =
                     new FilterReader(this.internalReader) {
                         public void close() {

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/TemporaryClob.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/TemporaryClob.java?rev=724294&r1=724293&r2=724294&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/TemporaryClob.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/TemporaryClob.java Mon Dec
 8 01:08:27 2008
@@ -30,6 +30,7 @@
 import java.io.Writer;
 import java.sql.SQLException;
 import org.apache.derby.iapi.error.StandardException;
+import org.apache.derby.iapi.jdbc.CharacterStreamDescriptor;
 import org.apache.derby.iapi.util.UTF8Util;
 
 /**
@@ -240,8 +241,15 @@
             throw new IllegalArgumentException(
                 "Position must be positive: " + pos);
         }
-        Reader isr = new ClobUpdatableReader (
-                (LOBInputStream) getRawByteStream(), conChild);
+        // Describe the stream to allow the reader to configure itself.
+        CharacterStreamDescriptor csd = new CharacterStreamDescriptor.Builder().
+                stream(this.bytes.getInputStream(0)).
+                positionAware(true).
+                bufferable(this.bytes.getLength() > 4096). // Cache if on disk.
+                byteLength(this.bytes.getLength()).
+                build();
+        Reader isr = new UTF8Reader(
+                csd, conChild, conChild.getConnectionSynchronization());
 
         long leftToSkip = pos -1;
         long skipped;

Modified: db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/UTF8Reader.java
URL: http://svn.apache.org/viewvc/db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/UTF8Reader.java?rev=724294&r1=724293&r2=724294&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/UTF8Reader.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/impl/jdbc/UTF8Reader.java Mon Dec  8
01:08:27 2008
@@ -30,7 +30,9 @@
 import java.sql.SQLException;
 
 import org.apache.derby.iapi.error.StandardException;
+import org.apache.derby.iapi.jdbc.CharacterStreamDescriptor;
 import org.apache.derby.iapi.services.sanity.SanityManager;
+import org.apache.derby.iapi.types.PositionedStream;
 import org.apache.derby.iapi.types.Resetable;
 
 /**
@@ -60,28 +62,14 @@
 
     /** The underlying data stream. */
     private InputStream in;
-    /** Store stream that can reposition itself on request. */
-    private final PositionedStoreStream positionedIn;
+    /** Stream that can reposition itself on request. */
+    private final PositionedStream positionedIn;
     /** Store last visited position in the store stream. */
     private long rawStreamPos = 0L;
-    /**
-     * The expected number of bytes in the stream, if known.
-     * <p>
-     * A value of <code>0<code> means the length is unknown, and that the end
-     * of the stream is marked with a Derby-specific end of stream marker.
-     */
-    private final long utfLen;          // bytes
     /** Number of bytes read from the stream. */
     private long       utfCount;        // bytes
     /** Number of characters read from the stream. */
     private long       readerCharCount; // characters
-    /** 
-     * The maximum number of characters allowed for the column
-     * represented by the passed stream.
-     * <p>
-     * A value of <code>0</code> means there is no associated maximum length.
-     */
-    private final long maxFieldSize;    // characters
 
     /** Internal character buffer storing characters read from the stream. */
     private final char[]   buffer;
@@ -102,6 +90,16 @@
     private ConnectionChild parent;
 
     /**
+     * Descriptor containing information about the stream.
+     * Except for the current positions, the information in this object is
+     * considered permanent and valid for the life-time of the stream.
+     */
+    private final CharacterStreamDescriptor csd;
+
+    /**
+     * TODO: This constructor will be removed! Is is currently retrofitted to
+     *  use a CharacterStreamDescriptor.
+     *
      * Constructs a reader and consumes the encoded length bytes from the
      * stream.
      * <p>
@@ -127,8 +125,8 @@
             throws IOException, SQLException
     {
         super(synchronization);
-        this.maxFieldSize = maxFieldSize;
         this.parent = parent;
+        long utfLen = 0;
 
         parent.setupContextStack();
         try {
@@ -142,14 +140,14 @@
                     // Note that buffering this UTF8Reader again, does not
                     // cause any trouble...
                     try {
-                        this.positionedIn.resetStream();
+                        ((Resetable)this.positionedIn).resetStream();
                     } catch (StandardException se) {
                         throw Util.newIOException(se);
                     }
                 } else {
                     this.positionedIn = null;
                 }
-                this.utfLen = readUnsignedShort();
+                utfLen = readUnsignedShort();
                 // Even if we are reading the encoded length, the stream may
                 // not be a positioned stream. This is currently true when a
                 // stream is passed in after a ResultSet.getXXXStream method.
@@ -170,45 +168,43 @@
             // filled three times to fill the internal character buffer.
             this.in = new BufferedInputStream(in, bufferSize);
         }
-    }
-
-    /**
-     * Constructs a <code>UTF8Reader</code> using a stream.
-     * <p>
-     * This consturctor accepts the stream size as parameter and doesn't
-     * attempt to read the length from the stream.
-     *
-     * @param in the underlying stream
-     * @param maxFieldSize the maximum allowed length for the associated column
-     * @param streamSize size of the underlying stream in bytes
-     * @param parent the connection child this stream is associated with
-     * @param synchronization object to synchronize on
-     */
-    public UTF8Reader(
-            InputStream in,
-            long maxFieldSize,
-            long streamSize,
-            ConnectionChild parent,
-            Object synchronization) {
-        super(synchronization);
-        this.maxFieldSize = maxFieldSize;
-        this.parent = parent;
-        this.utfLen = streamSize;
-        this.positionedIn = null;
-
-        if (SanityManager.DEBUG) {
-            // Do not allow the inputstream here to be a Resetable, as this
-            // means (currently, not by design...) that the length is encoded in
-            // the stream and we can't pass that out as data to the user.
-            SanityManager.ASSERT(!(in instanceof Resetable));
+        this.csd = new CharacterStreamDescriptor.Builder().
+                bufferable(positionedIn == null).
+                positionAware(positionedIn != null).byteLength(utfLen).
+                dataOffset(2).curBytePos(2).stream(in).
+                build();
+    }
+
+    public UTF8Reader(CharacterStreamDescriptor csd, ConnectionChild conChild,
+            Object sync) {
+        super(sync);
+        this.csd = csd;
+        this.positionedIn =
+                (csd.isPositionAware() ? csd.getPositionedStream() : null);
+        this.parent = conChild;
+
+        int buffersize = calculateBufferSize(csd);
+        this.buffer = new char[buffersize];
+
+        // Check and save the stream state.
+        if (SanityManager.DEBUG) { 
+            if (csd.isPositionAware()) {
+                SanityManager.ASSERT(
+                        csd.getCurBytePos() == positionedIn.getPosition());
+            }
+        }
+        this.rawStreamPos = positionedIn.getPosition();
+        // Make sure we start at the first data byte, not in the header.
+        if (rawStreamPos < csd.getDataOffset()) {
+            rawStreamPos = csd.getDataOffset();
+        }
+
+        // Buffer stream for improved performance, if appropriate.
+        if (csd.isBufferable()) {
+            this.in = new BufferedInputStream(csd.getStream(), buffersize);
+        } else {
+            this.in = csd.getStream();
         }
-        int bufferSize = calculateBufferSize(streamSize, maxFieldSize);
-        this.buffer = new char[bufferSize];
-        // Buffer this for improved performance.
-        // Note that the stream buffers bytes, whereas the internal buffer
-        // buffers characters. In worst case, the stream buffer must be filled
-        // three times to fill the internal character buffer.
-        this.in = new BufferedInputStream(in, bufferSize);
     }
 
     /*
@@ -465,6 +461,9 @@
                     throw Util.generateCsSQLException(se);
                 }
             }
+            // Keep track of how much we are allowed to read.
+            long utfLen = csd.getByteLength();
+            long maxFieldSize = csd.getMaxCharLength();
 readChars:
         while (
                 (charactersInBuffer < buffer.length) &&
@@ -475,7 +474,10 @@
             int c = in.read();
             if (c == -1) {
                 if (utfLen == 0) {
-                    closeIn();
+                    // Close the stream if it cannot be reset.
+                    if (!csd.isPositionAware()) {
+                        closeIn();
+                    }
                     break readChars;
                 }
                 throw utfFormatException("Reached EOF prematurely, " +
@@ -528,7 +530,10 @@
                             // we reached the end of a long string,
                             // that was terminated with
                             // (11100000, 00000000, 00000000)
-                            closeIn();
+                            // Close the stream if it cannot be reset.
+                            if (!csd.isPositionAware()) {
+                                closeIn();
+                            }
                             break readChars;
                         }
                         throw utfFormatException("Internal error: Derby-" +
@@ -570,7 +575,10 @@
             return false;
         }
 
-        closeIn();
+        // Close the stream if it cannot be reset.
+        if (!csd.isPositionAware()) {
+            closeIn();
+        }
         return true;
         } finally {
             parent.restoreContextStack();
@@ -591,10 +599,13 @@
      */
     private void resetUTF8Reader()
             throws IOException, StandardException {
-        // 2L to skip the length encoding bytes.
-        this.positionedIn.reposition(2L);
+        // Skip the length encoding bytes.
+        this.positionedIn.reposition(csd.getDataOffset());
         this.rawStreamPos = this.positionedIn.getPosition();
-        this.in = this.positionedIn;
+        // If bufferable, discard buffered stream and create a new one.
+        if (csd.isBufferable()) {
+            this.in = new BufferedInputStream(csd.getStream(), buffer.length);
+        }
         this.readerCharCount = this.utfCount = 0L;
         this.charactersInBuffer = this.readPositionInBuffer = 0;
     }
@@ -660,6 +671,8 @@
     }
 
     /**
+     * TODO: Remove this when CSD is fully integrated.
+     *
      * Calculates an optimized buffer size.
      * <p>
      * The maximum size allowed is returned if the specified values don't give
@@ -681,6 +694,34 @@
     }
 
     /**
+     * Calculates an optimized buffer size.
+     * <p>
+     * The maximum size allowed is returned if the specified values don't give
+     * enough information to say a smaller buffer size is preferable.
+     *
+     * @param csd stream descriptor
+     * @return An (sub)optimal buffer size.
+     */
+    private final int calculateBufferSize(CharacterStreamDescriptor csd) {
+        // Using the maximum buffer size will be optimal,
+        // unless the data is smaller than the maximum buffer.
+        int bufferSize = MAXIMUM_BUFFER_SIZE;
+        long knownLength = csd.getCharLength();
+        long maxCharLength = csd.getMaxCharLength();
+        if (knownLength < 1) {
+            // Unknown char length, use byte count instead (might be zero too).
+            knownLength = csd.getByteLength();
+        }
+        if (knownLength > 0 && knownLength < bufferSize) {
+            bufferSize = (int)knownLength;
+        }
+        if (maxCharLength > 0 && maxCharLength < bufferSize) {
+            bufferSize = (int)maxCharLength;
+        }
+        return bufferSize;
+    }
+
+    /**
      * Skips the requested number of characters.
      *
      * @param toSkip number of characters to skip



Mime
View raw message