activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1160681 - in /activemq/trunk/kahadb/src: main/java/org/apache/kahadb/page/PageFile.java main/java/org/apache/kahadb/page/Transaction.java test/java/org/apache/kahadb/index/BTreeIndexTest.java
Date Tue, 23 Aug 2011 13:34:10 GMT
Author: gtully
Date: Tue Aug 23 13:34:09 2011
New Revision: 1160681

URL: http://svn.apache.org/viewvc?rev=1160681&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3466: IndexOutOfBounds in kahadb with large number
of subscriptions and pending messages. use long locations such that temp file appends do not
overflow and ensure page file overflow does not leave oversized chunk, link pages till overflow
fits in a page, + some additional tests

Modified:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1160681&r1=1160680&r2=1160681&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Tue Aug 23 13:34:09
2011
@@ -140,8 +140,8 @@ public class PageFile {
         Page page;
         byte[] current;
         byte[] diskBound;
-        int currentLocation = -1;
-        int diskBoundLocation = -1;
+        long currentLocation = -1;
+        long diskBoundLocation = -1;
         File tmpFile;
         int length;
 
@@ -150,7 +150,7 @@ public class PageFile {
             current=data;
         }
 
-        public PageWrite(Page page, int currentLocation, int length, File tmpFile) {
+        public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
             this.page = page;
             this.currentLocation = currentLocation;
             this.tmpFile = tmpFile;
@@ -164,7 +164,7 @@ public class PageFile {
             diskBoundLocation = -1;
         }
 
-        public void setCurrentLocation(Page page, int location, int length) {
+        public void setCurrentLocation(Page page, long location, int length) {
             this.page = page;
             this.currentLocation = location;
             this.length = length;
@@ -186,7 +186,7 @@ public class PageFile {
                 diskBound = new byte[length];
                 RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
                 file.seek(diskBoundLocation);
-                int readNum = file.read(diskBound);
+                file.read(diskBound);
                 file.close();
                 diskBoundLocation = -1;
             }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1160681&r1=1160680&r2=1160681&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Tue Aug 23
13:34:09 2011
@@ -30,7 +30,7 @@ public class Transaction implements Iter
 
     private RandomAccessFile tmpFile;
     private File txFile;
-    private int nextLocation = 0;
+    private long nextLocation = 0;
 
     /**
      * The PageOverflowIOException occurs when a page write is requested
@@ -277,36 +277,38 @@ public class Transaction implements Iter
                     // If overflow is allowed
                     if (overflow) {
 
-                        Page next;
-                        if (current.getType() == Page.PAGE_PART_TYPE) {
-                            next = load(current.getNext(), null);
-                        } else {
-                            next = allocate();
-                        }
-
-                        next.txId = current.txId;
-
-                        // Write the page header
-                        int oldPos = pos;
-                        pos = 0;
-
-                        current.makePagePart(next.getPageId(), getWriteTransactionId());
-                        current.write(this);
-
-                        // Do the page write..
-                        byte[] data = new byte[pageSize];
-                        System.arraycopy(buf, 0, data, 0, pageSize);
-                        Transaction.this.write(current, data);
-
-                        // Reset for the next page chunk
-                        pos = 0;
-                        // The page header marshalled after the data is written.
-                        skip(Page.PAGE_HEADER_SIZE);
-                        // Move the overflow data after the header.
-                        System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
-                        pos += oldPos - pageSize;
-                        current = next;
+                        do {
+                            Page next;
+                            if (current.getType() == Page.PAGE_PART_TYPE) {
+                                next = load(current.getNext(), null);
+                            } else {
+                                next = allocate();
+                            }
+
+                            next.txId = current.txId;
+
+                            // Write the page header
+                            int oldPos = pos;
+                            pos = 0;
+
+                            current.makePagePart(next.getPageId(), getWriteTransactionId());
+                            current.write(this);
+
+                            // Do the page write..
+                            byte[] data = new byte[pageSize];
+                            System.arraycopy(buf, 0, data, 0, pageSize);
+                            Transaction.this.write(current, data);
+
+                            // Reset for the next page chunk
+                            pos = 0;
+                            // The page header marshalled after the data is written.
+                            skip(Page.PAGE_HEADER_SIZE);
+                            // Move the overflow data after the header.
+                            System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
+                            pos += oldPos - pageSize;
+                            current = next;
 
+                        } while (pos > pageSize);
                     } else {
                         throw new PageOverflowIOException("Page overflow.");
                     }
@@ -705,7 +707,7 @@ public class Transaction implements Iter
             if (tmpFile == null) {
                 tmpFile = new RandomAccessFile(getTempFile(), "rw");
             }
-            int location = nextLocation;
+            long location = nextLocation;
             tmpFile.seek(nextLocation);
             tmpFile.write(data);
             nextLocation = location + data.length;

Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=1160681&r1=1160680&r2=1160681&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Tue Aug
23 13:34:09 2011
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
@@ -220,8 +221,13 @@ public class BTreeIndexTest extends Inde
         index.remove(tx, key(1566));
     }
 
-    public void x_testLargeValue() throws Exception {
-        createPageFileAndIndex(4*1024);
+    public void testLargeValue() throws Exception {
+        //System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024));
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setEnablePageCaching(false);
+        pf.load();
+        tx = pf.tx();
         long id = tx.allocate().getPageId();
         tx.commit();
 
@@ -232,9 +238,9 @@ public class BTreeIndexTest extends Inde
         tx.commit();
 
         tx =  pf.tx();
-        String val = new String(new byte[93]);
-        final long numMessages = 2000;
-        final int numConsumers = 10000;
+        String val = new String(new byte[1024]);
+        final long numMessages = 10;
+        final int numConsumers = 200;
 
         for (long i=0; i<numMessages; i++) {
             HashSet<String> hs = new HashSet<String>();
@@ -243,13 +249,57 @@ public class BTreeIndexTest extends Inde
             }
             test.put(tx, i, hs);
         }
+        tx.commit();
+        tx =  pf.tx();
+        for (long i=0; i<numMessages; i++) {
+            HashSet<String> hs = new HashSet<String>();
+            for (int j=numConsumers; j<numConsumers*2;j++) {
+                hs.add(val + "SOME TEXT" + j);
+            }
+            test.put(tx, i, hs);
+        }
 
+        tx.commit();
+        tx =  pf.tx();
         for (long i=0; i<numMessages; i++) {
             test.get(tx, i);
         }
         tx.commit();
     }
 
+    public void testLargeValueOverflow() throws Exception {
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setEnablePageCaching(false);
+        pf.setWriteBatchSize(1);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        BTreeIndex<Long, String> test = new BTreeIndex<Long, String>(pf, id);
+        test.setKeyMarshaller(LongMarshaller.INSTANCE);
+        test.setValueMarshaller(StringMarshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        final int stringSize = 6*1024;
+        tx =  pf.tx();
+        String val = new String(new byte[stringSize]);
+        final long numMessages = 1;
+
+        for (long i=0; i<numMessages; i++) {
+            test.put(tx, i, val);
+        }
+        tx.commit();
+
+        tx =  pf.tx();
+        for (long i=0; i<numMessages; i++) {
+            String s = test.get(tx, i);
+            assertEquals("len is as expected", stringSize, s.length());
+        }
+        tx.commit();
+    }
+
     void doInsertReverse(int count) throws Exception {
         for (int i = count-1; i >= 0; i--) {
             index.put(tx, key(i), (long)i);



Mime
View raw message