Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5B79C68EF for ; Wed, 22 Jun 2011 12:55:59 +0000 (UTC) Received: (qmail 43599 invoked by uid 500); 22 Jun 2011 12:55:59 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 43572 invoked by uid 500); 22 Jun 2011 12:55:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 43559 invoked by uid 99); 22 Jun 2011 12:55:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jun 2011 12:55:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jun 2011 12:55:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 36218238896F; Wed, 22 Jun 2011 12:55:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1138442 - in /activemq/trunk: activemq-core/src/test/java/org/apache/activemq/usecases/ activemq-web/src/main/java/org/apache/activemq/web/ kahadb/src/main/java/org/apache/kahadb/index/ kahadb/src/main/java/org/apache/kahadb/page/ Date: Wed, 22 Jun 2011 12:55:37 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110622125537.36218238896F@eris.apache.org> Author: dejanb Date: Wed Jun 22 12:55:36 2011 New Revision: 1138442 URL: http://svn.apache.org/viewvc?rev=1138442&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3374 - first stab at fixing long kahadb tx oom problem Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java?rev=1138442&r1=1138441&r2=1138442&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java Wed Jun 22 12:55:36 2011 @@ -19,6 +19,7 @@ package org.apache.activemq.usecases; import java.lang.management.ManagementFactory; import javax.jms.Connection; +import javax.jms.MessageProducer; import javax.jms.Session; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -43,7 +44,15 @@ public class DurableUnsubscribeTest exte Destination d = broker.getDestination(topic); assertEquals("Subscription is missing.", 1, d.getConsumers().size()); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(topic); + for (int i = 0; i < 1000; i++) { + producer.send(session.createTextMessage("text")); + } + + Thread.sleep(1000); + session.unsubscribe("SubsId"); session.close(); @@ -92,7 +101,7 @@ public class DurableUnsubscribeTest exte private void createBroker() throws Exception { broker = BrokerFactory.createBroker("broker:(vm://localhost)"); - broker.setPersistent(false); + //broker.setPersistent(false); broker.setUseJmx(true); broker.setBrokerName(getName()); broker.start(); Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java?rev=1138442&r1=1138441&r2=1138442&view=diff ============================================================================== --- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java (original) +++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java Wed Jun 22 12:55:36 2011 @@ -61,7 +61,8 @@ import org.slf4j.LoggerFactory; * stored inside a HttpSession TODO controls to prevent DOS attacks with users * requesting many consumers TODO configure consumers with small prefetch. * - * + * + * */ public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable { Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java?rev=1138442&r1=1138441&r2=1138442&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java Wed Jun 22 12:55:36 2011 @@ -345,7 +345,7 @@ public class HashIndex implem tx.store(metadata.page, metadataMarshaller, true); calcThresholds(); - LOG.debug("Resizing done. New bins start at: "+metadata.binPageId); + LOG.debug("Resizing done. New bins start at: "+metadata.binPageId); resizeCapacity=0; resizePageId=0; } Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1138442&r1=1138441&r2=1138442&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Wed Jun 22 12:55:36 2011 @@ -138,15 +138,35 @@ public class PageFile { Page page; byte[] current; byte[] diskBound; + int currentLocation = -1; + int diskBoundLocation = -1; + File tmpFile; + int length; public PageWrite(Page page, byte[] data) { this.page=page; current=data; } + + public PageWrite(Page page, int currentLocation, int length, File tmpFile) { + this.page = page; + this.currentLocation = currentLocation; + this.tmpFile = tmpFile; + this.length = length; + } public void setCurrent(Page page, byte[] data) { this.page=page; current=data; + currentLocation = -1; + diskBoundLocation = -1; + } + + public void setCurrentLocation(Page page, int location, int length) { + this.page = page; + this.currentLocation = location; + this.length = length; + this.current = null; } @Override @@ -158,22 +178,42 @@ public class PageFile { public Page getPage() { return page; } + + public byte[] getDiskBound() throws IOException { + if (diskBound == null && diskBoundLocation != -1) { + diskBound = new byte[length]; + RandomAccessFile file = new RandomAccessFile(tmpFile, "r"); + file.seek(diskBoundLocation); + int readNum = file.read(diskBound); + file.close(); + diskBoundLocation = -1; + } + return diskBound; + } void begin() { - diskBound = current; - current = null; + if (currentLocation != -1) { + diskBoundLocation = currentLocation; + currentLocation = -1; + current = null; + } else { + diskBound = current; + current = null; + currentLocation = -1; + } } /** * @return true if there is no pending writes to do. */ boolean done() { + diskBoundLocation = -1; diskBound=null; - return current == null; + return current == null || currentLocation == -1; } boolean isDone() { - return diskBound == null && current == null; + return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1; } } @@ -470,7 +510,7 @@ public class PageFile { return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX); } - private long toOffset(long pageId) { + public long toOffset(long pageId) { return PAGE_FILE_HEADER_SIZE+(pageId*pageSize); } @@ -823,6 +863,8 @@ public class PageFile { } } + boolean longTx = false; + for (Map.Entry entry : updates) { Long key = entry.getKey(); PageWrite value = entry.getValue(); @@ -830,12 +872,20 @@ public class PageFile { if( write==null ) { writes.put(key, value); } else { - write.setCurrent(value.page, value.current); + if (value.currentLocation != -1) { + write.setCurrentLocation(value.page, value.currentLocation, value.length); + write.tmpFile = value.tmpFile; + longTx = true; + } else { + write.setCurrent(value.page, value.current); + } } } // Once we start approaching capacity, notify the writer to start writing - if( canStartWriteBatch() ) { + // sync immediately for long txs + if( longTx || canStartWriteBatch() ) { + if( enabledWriteThread ) { writes.notify(); } else { @@ -919,115 +969,90 @@ public class PageFile { } } - /** - * - * @return true if there are still pending writes to do. - * @throws InterruptedException - * @throws IOException - */ - private void writeBatch() throws IOException { - - CountDownLatch checkpointLatch; - ArrayList batch; - synchronized( writes ) { + private void writeBatch() throws IOException { + + CountDownLatch checkpointLatch; + ArrayList batch; + synchronized( writes ) { // If there is not enough to write, wait for a notification... batch = new ArrayList(writes.size()); // build a write batch from the current write cache. for (PageWrite write : writes.values()) { batch.add(write); - // Move the current write to the diskBound write, this lets folks update the + // Move the current write to the diskBound write, this lets folks update the // page again without blocking for this write. write.begin(); - if (write.diskBound == null) { + if (write.diskBound == null && write.diskBoundLocation == -1) { batch.remove(write); } } - // Grab on to the existing checkpoint latch cause once we do this write we can + // Grab on to the existing checkpoint latch cause once we do this write we can // release the folks that were waiting for those writes to hit disk. checkpointLatch = this.checkpointLatch; this.checkpointLatch=null; - } - - try { - if (enableRecoveryFile) { + } - // Using Adler-32 instead of CRC-32 because it's much faster and - // it's - // weakness for short messages with few hundred bytes is not a - // factor in this case since we know - // our write batches are going to much larger. - Checksum checksum = new Adler32(); - for (PageWrite w : batch) { - try { - checksum.update(w.diskBound, 0, pageSize); - } catch (Throwable t) { - throw IOExceptionSupport.create( - "Cannot create recovery file. Reason: " + t, t); - } - } - - // Can we shrink the recovery buffer?? - if (recoveryPageCount > recoveryFileMaxPageCount) { - int t = Math.max(recoveryFileMinPageCount, batch.size()); - recoveryFile.setLength(recoveryFileSizeForPages(t)); - } - - // Record the page writes in the recovery buffer. - recoveryFile.seek(0); - // Store the next tx id... - recoveryFile.writeLong(nextTxid.get()); - // Store the checksum for thw write batch so that on recovery we - // know if we have a consistent - // write batch on disk. - recoveryFile.writeLong(checksum.getValue()); - // Write the # of pages that will follow - recoveryFile.writeInt(batch.size()); - - // Write the pages. - recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); - - for (PageWrite w : batch) { - recoveryFile.writeLong(w.page.getPageId()); - recoveryFile.write(w.diskBound, 0, pageSize); - } - - if (enableDiskSyncs) { - // Sync to make sure recovery buffer writes land on disk.. - recoveryFile.getFD().sync(); - } - - recoveryPageCount = batch.size(); - } - - for (PageWrite w : batch) { - writeFile.seek(toOffset(w.page.getPageId())); - writeFile.write(w.diskBound, 0, pageSize); - w.done(); - } - - // Sync again - if (enableDiskSyncs) { - writeFile.getFD().sync(); - } - - } finally { - synchronized (writes) { - for (PageWrite w : batch) { - // If there are no more pending writes, then remove it from - // the write cache. - if (w.isDone()) { - writes.remove(w.page.getPageId()); - } - } - } - - if( checkpointLatch!=null ) { - checkpointLatch.countDown(); - } - } - } + Checksum checksum = new Adler32(); + recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); + for (PageWrite w : batch) { + if (enableRecoveryFile) { + try { + checksum.update(w.getDiskBound(), 0, pageSize); + } catch (Throwable t) { + throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t); + } + recoveryFile.writeLong(w.page.getPageId()); + recoveryFile.write(w.getDiskBound(), 0, pageSize); + } + + writeFile.seek(toOffset(w.page.getPageId())); + writeFile.write(w.getDiskBound(), 0, pageSize); + w.done(); + } + + try { + if (enableRecoveryFile) { + // Can we shrink the recovery buffer?? + if (recoveryPageCount > recoveryFileMaxPageCount) { + int t = Math.max(recoveryFileMinPageCount, batch.size()); + recoveryFile.setLength(recoveryFileSizeForPages(t)); + } + + // Record the page writes in the recovery buffer. + recoveryFile.seek(0); + // Store the next tx id... + recoveryFile.writeLong(nextTxid.get()); + // Store the checksum for thw write batch so that on recovery we + // know if we have a consistent + // write batch on disk. + recoveryFile.writeLong(checksum.getValue()); + // Write the # of pages that will follow + recoveryFile.writeInt(batch.size()); + } + + if (enableDiskSyncs) { + // Sync to make sure recovery buffer writes land on disk.. + recoveryFile.getFD().sync(); + writeFile.getFD().sync(); + } + } finally { + synchronized (writes) { + for (PageWrite w : batch) { + // If there are no more pending writes, then remove it from + // the write cache. + if (w.isDone()) { + writes.remove(w.page.getPageId()); + } + } + } + + if (checkpointLatch != null) { + checkpointLatch.countDown(); + } + } + } private long recoveryFileSizeForPages(int pageCount) { return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount); @@ -1135,4 +1160,7 @@ public class PageFile { return getMainPageFile(); } + public File getDirectory() { + return directory; + } } Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1138442&r1=1138441&r2=1138442&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Wed Jun 22 12:55:36 2011 @@ -16,22 +16,11 @@ */ package org.apache.kahadb.page; -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Iterator; -import java.util.NoSuchElementException; +import java.io.*; +import java.util.*; import org.apache.kahadb.page.PageFile.PageWrite; -import org.apache.kahadb.util.ByteSequence; -import org.apache.kahadb.util.DataByteArrayInputStream; -import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.Marshaller; -import org.apache.kahadb.util.Sequence; -import org.apache.kahadb.util.SequenceSet; +import org.apache.kahadb.util.*; /** * The class used to read/update a PageFile object. Using a transaction allows you to @@ -39,6 +28,11 @@ import org.apache.kahadb.util.SequenceSe */ public class Transaction implements Iterable { + + private RandomAccessFile tmpFile; + private File txfFile; + private int nextLocation = 0; + /** * The PageOverflowIOException occurs when a page write is requested * and it's data is larger than what would fit into a single page. @@ -91,12 +85,16 @@ public class Transaction implements Iter // If this transaction is updating stuff.. this is the tx of private long writeTransactionId=-1; // List of pages that this transaction has modified. - private HashMap writes=new HashMap(); + private TreeMap writes=new TreeMap(); // List of pages allocated in this transaction private final SequenceSet allocateList = new SequenceSet(); // List of pages freed in this transaction private final SequenceSet freeList = new SequenceSet(); + private long maxTransactionSize = 10485760; + + private long size = 0; + Transaction(PageFile pageFile) { this.pageFile = pageFile; } @@ -650,7 +648,16 @@ public class Transaction implements Iter allocateList.clear(); writes.clear(); writeTransactionId = -1; + if (tmpFile != null) { + tmpFile.close(); + if (!getTempFile().delete()) { + throw new IOException("Can't delete temporary KahaDB transaction file:" + getTempFile()); + } + tmpFile = null; + txfFile = null; + } } + size = 0; } /** @@ -665,7 +672,16 @@ public class Transaction implements Iter allocateList.clear(); writes.clear(); writeTransactionId = -1; + if (tmpFile != null) { + tmpFile.close(); + if (getTempFile().delete()) { + throw new IOException("Can't delete temporary KahaDB transaction file:" + getTempFile()); + } + tmpFile = null; + txfFile = null; + } } + size = 0; } private long getWriteTransactionId() { @@ -675,16 +691,36 @@ public class Transaction implements Iter return writeTransactionId; } + + protected File getTempFile() { + if (txfFile == null) { + txfFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName(Long.toString(getWriteTransactionId())) + ".tmp"); + } + return txfFile; + } + /** * Queues up a page write that should get done when commit() gets called. */ @SuppressWarnings("unchecked") private void write(final Page page, byte[] data) throws IOException { Long key = page.getPageId(); - // TODO: if a large update transaction is in progress, we may want to move - // all the current updates to a temp file so that we don't keep using - // up memory. - writes.put(key, new PageWrite(page, data)); + size += data.length; + + PageWrite write; + if (size > maxTransactionSize) { + if (tmpFile == null) { + tmpFile = new RandomAccessFile(getTempFile(), "rw"); + } + int location = nextLocation; + tmpFile.seek(nextLocation); + tmpFile.write(data); + nextLocation = location + data.length; + write = new PageWrite(page, location, data.length, getTempFile()); + } else { + write = new PageWrite(page, data); + } + writes.put(key, write); } /**