jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpfis...@apache.org
Subject svn commit: r513340 - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal: AbstractJournal.java AppendRecord.java DatabaseJournal.java FileJournal.java FileRecordLog.java
Date Thu, 01 Mar 2007 14:14:18 GMT
Author: dpfister
Date: Thu Mar  1 06:14:17 2007
New Revision: 513340

URL: http://svn.apache.org/viewvc?view=rev&rev=513340
Log:
JCR-773 - Under heavy load, database journal may contain empty update records.

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java?view=diff&rev=513340&r1=513339&r2=513340
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
Thu Mar  1 06:14:17 2007
@@ -22,7 +22,7 @@
 import java.util.Map;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.io.File;
+import java.io.InputStream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -273,11 +273,13 @@
      * Append a record backed by a file. Subclass responsibility.
      *
      * @param producerId producer identifier
+     * @param in input stream
+     * @param length number of bytes in input stream
      * @return the new record's revision
      *
      * @throws JournalException if an error occurs
      */
-    protected abstract long append(String producerId, File file)
+    protected abstract long append(String producerId, InputStream in, int length)
             throws JournalException;
 
     /**

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java?view=diff&rev=513340&r1=513339&r2=513340
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java
Thu Mar  1 06:14:17 2007
@@ -19,9 +19,14 @@
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
-import java.io.DataOutputStream;
+import java.io.InputStream;
 import java.io.IOException;
 
 /**
@@ -45,6 +50,16 @@
     private static final String DEFAULT_EXT = ".tmp";
 
     /**
+     * Default size for in-memory records.
+     */
+    private static final int DEFAULT_IN_MEMORY_SIZE = 1024;
+
+    /**
+     * Maximum size for in-memory records.
+     */
+    private static final int MAXIMUM_IN_MEMORY_SIZE = 65536;
+
+    /**
      * Journal where record is being appended.
      */
     private final AbstractJournal journal;
@@ -60,14 +75,29 @@
     private long revision;
 
     /**
+     * Underlying data output.
+     */
+    private DataOutputStream dataOut;
+
+    /**
+     * Underlying byte output.
+     */
+    private ByteArrayOutputStream byteOut;
+
+    /**
      * Underlying file.
      */
     private File file;
 
     /**
-     * Underlying data output.
+     * Underlying file output.
      */
-    private DataOutputStream dataOut;
+    private FileOutputStream fileOut;
+
+    /**
+     * Flag indicating whether the output is closed.
+     */
+    private boolean outputClosed;
 
     /**
      * Create a new instance of this class.
@@ -81,6 +111,9 @@
         this.journal = journal;
         this.producerId = producerId;
         this.revision = 0L;
+
+        byteOut = new ByteArrayOutputStream(DEFAULT_IN_MEMORY_SIZE);
+        dataOut = new DataOutputStream(byteOut);
     }
 
     /**
@@ -108,7 +141,7 @@
      * {@inheritDoc}
      */
     public void writeByte(int n) throws JournalException {
-        open();
+        checkOutput();
 
         try {
             dataOut.writeByte(n);
@@ -122,7 +155,7 @@
      * {@inheritDoc}
      */
     public void writeChar(char c) throws JournalException {
-        open();
+        checkOutput();
 
         try {
             dataOut.writeChar(c);
@@ -136,7 +169,7 @@
      * {@inheritDoc}
      */
     public void writeBoolean(boolean b) throws JournalException {
-        open();
+        checkOutput();
 
         try {
             dataOut.writeBoolean(b);
@@ -150,7 +183,7 @@
      * {@inheritDoc}
      */
     public void writeInt(int n) throws JournalException {
-        open();
+        checkOutput();
 
         try {
             dataOut.writeInt(n);
@@ -164,7 +197,7 @@
      * {@inheritDoc}
      */
     public void writeString(String s) throws JournalException {
-        open();
+        checkOutput();
 
         try {
             if (s == null) {
@@ -183,7 +216,7 @@
      * {@inheritDoc}
      */
     public void write(byte[] b) throws JournalException {
-        open();
+        checkOutput();
 
         try {
             dataOut.write(b);
@@ -200,9 +233,22 @@
         boolean succeeded = false;
 
         try {
-            close();
-            revision = journal.append(producerId, file);
-            succeeded = true;
+            int length = dataOut.size();
+            closeOutput();
+
+            InputStream in = openInput();
+
+            try {
+                revision = journal.append(producerId, in, length);
+                succeeded = true;
+            } finally {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    String msg = "I/O error while closing stream.";
+                    log.warn(msg, e);
+                }
+            }
         } finally {
             dispose();
 
@@ -214,44 +260,80 @@
      * {@inheritDoc}
      */
     public void cancelUpdate() {
-        if (dataOut != null) {
+        if (!outputClosed) {
             dispose();
-            
+
             journal.unlock(false);
         }
     }
 
     /**
-     * Create temporary file and open data output on it.
+     * Open input on record written.
+     */
+    private InputStream openInput() throws JournalException {
+        if (file != null) {
+            try {
+                return new FileInputStream(file);
+            } catch (IOException e) {
+                String msg = "Unable to open file input on: " + file.getPath();
+                throw new JournalException(msg, e);
+            }
+        } else {
+            return new ByteArrayInputStream(byteOut.toByteArray());
+        }
+    }
+
+    /**
+     * Check output size and eventually switch to file output.
      *
      * @throws JournalException
      */
-    private void open() throws JournalException {
-        if (file == null) {
+    private void checkOutput() throws JournalException {
+        if (outputClosed) {
+            throw new IllegalStateException("Output closed.");
+        }
+        if (fileOut == null && byteOut.size() >= MAXIMUM_IN_MEMORY_SIZE) {
             try {
                 file = File.createTempFile(DEFAULT_PREFIX, DEFAULT_EXT);
-                dataOut = new DataOutputStream(new FileOutputStream(file));
             } catch (IOException e) {
                 String msg = "Unable to create temporary file.";
                 throw new JournalException(msg, e);
             }
+            try {
+                fileOut = new FileOutputStream(file);
+            } catch (FileNotFoundException e) {
+                String msg = "Unable to open output stream on: " + file.getPath();
+                throw new JournalException(msg, e);
+            }
+            dataOut = new DataOutputStream(fileOut);
+
+            try {
+                dataOut.write(byteOut.toByteArray());
+            } catch (IOException e) {
+                String msg = "Unable to write in-memory record to file.";
+                throw new JournalException(msg, e);
+            }
         }
     }
 
     /**
-     * Close this record, keeping the underlying file.
+     * Close output, keeping the underlying file.
      *
      * @throws JournalException if an error occurs
      */
-    private void close() throws JournalException {
-        if (dataOut != null) {
+    private void closeOutput() throws JournalException {
+        if (!outputClosed) {
             try {
-                dataOut.close();
+                if (fileOut != null) {
+                    dataOut.flush();
+                    fileOut.getFD().sync();
+                    dataOut.close();
+                }
             } catch (IOException e) {
                 String msg = "I/O error while closing stream.";
                 throw new JournalException(msg, e);
             } finally {
-                dataOut = null;
+                outputClosed = true;
             }
         }
     }
@@ -260,14 +342,14 @@
      * Dispose this record, deleting the underlying file.
      */
     private void dispose() {
-        if (dataOut != null) {
+        if (!outputClosed) {
             try {
                 dataOut.close();
             } catch (IOException e) {
                 String msg = "I/O error while closing stream.";
                 log.warn(msg, e);
             } finally {
-                dataOut = null;
+                outputClosed = true;
             }
         }
         if (file != null) {

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java?view=diff&rev=513340&r1=513339&r2=513340
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
Thu Mar  1 06:14:17 2007
@@ -21,11 +21,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.sql.PreparedStatement;
@@ -282,24 +279,22 @@
     /**
      * {@inheritDoc}
      */
-    protected long append(String producerId, File file) throws JournalException {
-        try {
-            InputStream in = new BufferedInputStream(new FileInputStream(file));
+    protected long append(String producerId, InputStream in, int length)
+            throws JournalException {
 
+        try {
             try {
                 insertRevisionStmt.clearParameters();
                 insertRevisionStmt.clearWarnings();
                 insertRevisionStmt.setLong(1, lockedRevision);
                 insertRevisionStmt.setString(2, getId());
                 insertRevisionStmt.setString(3, producerId);
-                insertRevisionStmt.setBinaryStream(4, in, (int) file.length());
+                insertRevisionStmt.setBinaryStream(4, in, length);
                 insertRevisionStmt.execute();
 
                 con.commit();
                 return lockedRevision;
             } finally {
-                close(in);
-
                 try {
                     con.setAutoCommit(true);
                 } catch (SQLException e) {
@@ -307,9 +302,6 @@
                     log.warn(msg, e);
                 }
             }
-        } catch (IOException e) {
-            String msg = "Unable to open journal log '" + file + "'.";
-            throw new JournalException(msg, e);
         } catch (SQLException e) {
             String msg = "Unable to append revision " + lockedRevision + ".";
             throw new JournalException(msg, e);

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java?view=diff&rev=513340&r1=513339&r2=513340
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java
Thu Mar  1 06:14:17 2007
@@ -24,6 +24,7 @@
 import java.util.Comparator;
 import java.io.File;
 import java.io.FilenameFilter;
+import java.io.InputStream;
 import java.io.IOException;
 
 /**
@@ -158,7 +159,9 @@
     /**
      * {@inheritDoc}
      */
-    protected long append(String producerId, File file) throws JournalException {
+    protected long append(String producerId, InputStream in, int length)
+            throws JournalException {
+
         try {
             FileRecordLog recordLog = new FileRecordLog(journalFile);
             if (recordLog.exceeds(maximumSize)) {
@@ -168,7 +171,7 @@
             if (recordLog.isNew()) {
                 recordLog.init(globalRevision.get());
             }
-            long revision = recordLog.append(getId(), producerId, file);
+            long revision = recordLog.append(getId(), producerId, in, length);
             globalRevision.set(revision);
             return revision;
 

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java?view=diff&rev=513340&r1=513339&r2=513340
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java
Thu Mar  1 06:14:17 2007
@@ -237,28 +237,35 @@
     }
 
     /**
-     * Append a record backed by a file to this log. Returns the revision
-     * following this record.
+     * Append a record to this log. Returns the revision following this record.
      *
      * @param journalId journal identifier
      * @param producerId producer identifier
-     * @param file record to add
+     * @param in record to add
+     * @param length record length
      * @throws java.io.IOException if an I/O error occurs
      */
-    public long append(String journalId, String producerId, File file)
+    public long append(String journalId, String producerId, InputStream in, int length)
             throws IOException {
 
         DataOutputStream out = new DataOutputStream(new FileOutputStream(logFile, true));
 
         try {
-            int recordLength = (int) file.length();
             out.writeUTF(journalId);
             out.writeUTF(producerId);
-            out.writeInt(recordLength);
-            append(file, out);
+            out.writeInt(length);
+
+            byte[] buffer = new byte[8192];
+            int len;
+
+            while ((len = in.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+            out.flush();
+
             lastRevision += 2 + utfLength(journalId) +
                 2 + utfLength(producerId) +
-                4 + file.length();
+                4 + length;
             return lastRevision;
         } finally {
             close(out);
@@ -340,27 +347,6 @@
         } catch (IOException e) {
             String msg = "I/O error while closing input stream.";
             log.warn(msg, e);
-        }
-    }
-
-    /**
-     * Append a file to this log's output stream.
-     *
-     * @param file file to append
-     * @param out where to append to
-     */
-    private static void append(File file, DataOutputStream out) throws IOException {
-        byte[] buffer = new byte[8192];
-        int len;
-
-        InputStream in = new BufferedInputStream(new FileInputStream(file));
-        try {
-            while ((len = in.read(buffer)) > 0) {
-                out.write(buffer, 0, len);
-            }
-            out.flush();
-        } finally {
-            close(in);
         }
     }
 



Mime
View raw message