activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r785769 - in /activemq/sandbox/activemq-flow: activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ activemq-store/src/test/java/org/apache/activemq/broker/store/
Date Wed, 17 Jun 2009 20:00:10 GMT
Author: cmacnaug
Date: Wed Jun 17 20:00:09 2009
New Revision: 785769

URL: http://svn.apache.org/viewvc?rev=785769&view=rev
Log:
Fixing up checkPoint cleanup not preserve in use data files

Modified:
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
    activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=785769&r1=785768&r2=785769&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
Wed Jun 17 20:00:09 2009
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.TreeSet;
 import java.util.Map.Entry;
 
 import org.apache.activemq.broker.store.Store;
@@ -33,6 +34,7 @@
 import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
 import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.LongMarshaller;
@@ -287,5 +289,4 @@
             this.size = size;
         }
     }
-
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=785769&r1=785768&r2=785769&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
Wed Jun 17 20:00:09 2009
@@ -193,6 +193,17 @@
                     }
                 }
             }
+            
+            if (deleteAllMessages) {
+                getJournal().start();
+                journal.delete();
+                journal.close();
+                journal = null;
+                getPageFile().delete();
+                rootEntity = new RootEntity();
+                LOG.info("Persistence store purged.");
+                deleteAllMessages = false;
+            }
 
             getJournal().start();
 
@@ -236,18 +247,7 @@
         try {
             open();
 
-            if (deleteAllMessages) {
-                journal.delete();
-
-                pageFile.unload();
-                pageFile.delete();
-                rootEntity = new RootEntity();
-
-                LOG.info("Persistence store purged.");
-                deleteAllMessages = false;
-
-                loadPageFile();
-            }
+            
             store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())),
null);
         } finally {
             indexLock.writeLock().unlock();
@@ -439,8 +439,9 @@
 
     protected void checkpointCleanup(final boolean cleanup) {
         try {
-            long start = System.currentTimeMillis();
             indexLock.writeLock().lock();
+            long start = System.currentTimeMillis();
+            
             try {
                 if (!opened.get()) {
                     return;
@@ -455,7 +456,11 @@
             }
             long end = System.currentTimeMillis();
             if (end - start > 1000) {
-                LOG.warn("KahaDB Cleanup took " + (end - start));
+                if (cleanup) {
+                    LOG.warn("KahaDB Cleanup took " + (end - start));
+                } else {
+                    LOG.warn("KahaDB CheckPoint took " + (end - start));
+                }
             }
         } catch (IOException e) {
             e.printStackTrace();
@@ -503,77 +508,7 @@
                 gcCandidateSet.removeAll(journalFilesBeingReplicated);
             }
 
-            // Don't GC files after the first in progress tx
-            Location firstTxLocation = rootEntity.getLastUpdate();
-
-            if (firstTxLocation != null) {
-                while (!gcCandidateSet.isEmpty()) {
-                    Integer last = gcCandidateSet.last();
-                    if (last >= firstTxLocation.getDataFileId()) {
-                        gcCandidateSet.remove(last);
-                    } else {
-                        break;
-                    }
-                }
-            }
-
-            // // Go through all the destinations to see if any of them can
-            // remove GC candidates.
-            // for (StoredDestinationState sd : storedDestinations.values()) {
-            // if( gcCandidateSet.isEmpty() ) {
-            // break;
-            // }
-            //                
-            // // Use a visitor to cut down the number of pages that we load
-            // dbstate.locationIndex.visit(tx, new BTreeVisitor<Location,
-            // Long>() {
-            // int last=-1;
-            // public boolean isInterestedInKeysBetween(Location first, Location
-            // second) {
-            // if( first==null ) {
-            // SortedSet<Integer> subset =
-            // gcCandidateSet.headSet(second.getDataFileId()+1);
-            // if( !subset.isEmpty() && subset.last() == second.getDataFileId()
-            // ) {
-            // subset.remove(second.getDataFileId());
-            // }
-            // return !subset.isEmpty();
-            // } else if( second==null ) {
-            // SortedSet<Integer> subset =
-            // gcCandidateSet.tailSet(first.getDataFileId());
-            // if( !subset.isEmpty() && subset.first() == first.getDataFileId()
-            // ) {
-            // subset.remove(first.getDataFileId());
-            // }
-            // return !subset.isEmpty();
-            // } else {
-            // SortedSet<Integer> subset =
-            // gcCandidateSet.subSet(first.getDataFileId(),
-            // second.getDataFileId()+1);
-            // if( !subset.isEmpty() && subset.first() == first.getDataFileId()
-            // ) {
-            // subset.remove(first.getDataFileId());
-            // }
-            // if( !subset.isEmpty() && subset.last() == second.getDataFileId()
-            // ) {
-            // subset.remove(second.getDataFileId());
-            // }
-            // return !subset.isEmpty();
-            // }
-            // }
-            //    
-            // public void visit(List<Location> keys, List<Long> values) {
-            // for (Location l : keys) {
-            // int fileId = l.getDataFileId();
-            // if( last != fileId ) {
-            // gcCandidateSet.remove(fileId);
-            // last = fileId;
-            // }
-            // }
-            // }
-            //    
-            // });
-            // }
+            rootEntity.removeGCCandidates(gcCandidateSet, tx);
 
             if (!gcCandidateSet.isEmpty()) {
                 if (LOG.isErrorEnabled()) {
@@ -646,7 +581,7 @@
                 LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start)
+ " ms, Index Update took " + (end - start2) + " ms");
             }
             return location;
-            
+
         } finally {
             if (tx == null)
                 indexLock.writeLock().unlock();

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=785769&r1=785768&r2=785769&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
Wed Jun 17 20:00:09 2009
@@ -22,7 +22,10 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.Map.Entry;
 
 import org.apache.activemq.broker.store.Store;
@@ -32,22 +35,28 @@
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.IntegerMarshaller;
 import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.Marshaller;
 import org.apache.kahadb.util.VariableMarshaller;
 
 public class RootEntity {
 
+    //TODO remove this one performance testing is complete. 
+    private static final boolean USE_LOC_INDEX = true;
+
     public final static Marshaller<RootEntity> MARSHALLER = new VariableMarshaller<RootEntity>()
{
         public RootEntity readPayload(DataInput is) throws IOException {
             RootEntity rc = new RootEntity();
             rc.state = is.readInt();
             rc.maxMessageKey = is.readLong();
             rc.messageKeyIndex = new BTreeIndex<Long, Location>(is.readLong());
-            // rc.locationIndex = new BTreeIndex<Location, Long>(is.readLong());
+            if (USE_LOC_INDEX)
+                rc.locationIndex = new BTreeIndex<Integer, Long>(is.readLong());
             rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
             rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
             if (is.readBoolean()) {
@@ -62,7 +71,8 @@
             os.writeInt(object.state);
             os.writeLong(object.maxMessageKey);
             os.writeLong(object.messageKeyIndex.getPageId());
-            // os.writeLong(object.locationIndex.getPageId());
+            if (USE_LOC_INDEX)
+                os.writeLong(object.locationIndex.getPageId());
             os.writeLong(object.destinationIndex.getPageId());
             os.writeLong(object.messageRefsIndex.getPageId());
             if (object.lastUpdate != null) {
@@ -86,7 +96,7 @@
     // Message Indexes
     private long maxMessageKey;
     private BTreeIndex<Long, Location> messageKeyIndex;
-    // private BTreeIndex<Location, Long> locationIndex;
+    private BTreeIndex<Integer, Long> locationIndex;
     private BTreeIndex<Long, Long> messageRefsIndex; // Maps message key to ref
     // count:
 
@@ -107,8 +117,8 @@
         state = KahaDBStore.CLOSED_STATE;
 
         messageKeyIndex = new BTreeIndex<Long, Location>(tx.getPageFile(), tx.allocate().getPageId());
-        // locationIndex = new BTreeIndex<Location, Long>(tx.getPageFile(),
-        // tx.allocate().getPageId());
+        if (USE_LOC_INDEX)
+            locationIndex = new BTreeIndex<Integer, Long>(tx.getPageFile(), tx.allocate().getPageId());
         destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(),
tx.allocate().getPageId());
         messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
 
@@ -128,10 +138,13 @@
                 maxMessageKey = last.getKey();
             }
         }
-        // locationIndex.setPageFile(tx.getPageFile());
-        // locationIndex.setKeyMarshaller(Marshallers.LOCATION_MARSHALLER);
-        // locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-        // locationIndex.load(tx);
+
+        if (USE_LOC_INDEX) {
+            locationIndex.setPageFile(tx.getPageFile());
+            locationIndex.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+            locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+            locationIndex.load(tx);
+        }
 
         destinationIndex.setPageFile(tx.getPageFile());
         destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
@@ -245,15 +258,31 @@
             // Message existed.. undo the index update we just did. Chances
             // are it's a transaction replay.
             messageKeyIndex.put(tx, id, previous);
+        } else {
+            if (USE_LOC_INDEX) {
+                Long refs = locationIndex.get(tx, location.getDataFileId());
+                if (refs == null) {
+                    locationIndex.put(tx, location.getDataFileId(), new Long(1));
+                } else {
+                    locationIndex.put(tx, location.getDataFileId(), new Long(refs.longValue()
+ 1));
+                }
+            }
         }
     }
 
     public void messageRemove(Transaction tx, Long messageKey) throws IOException {
         // Location location = messageKeyIndex.remove(tx, messageKey);
-        messageKeyIndex.remove(tx, messageKey);
-        // if (location != null) {
-        // locationIndex.remove(tx, location);
-        // }
+        Location location = messageKeyIndex.remove(tx, messageKey);
+        if (USE_LOC_INDEX && location != null) {
+            Long refs = locationIndex.get(tx, location.getDataFileId());
+            if (refs != null) {
+                if (refs.longValue() <= 1) {
+                    locationIndex.remove(tx, location.getDataFileId());
+                } else {
+                    locationIndex.put(tx, location.getDataFileId(), new Long(refs.longValue()
- 1));
+                }
+            }
+        }
     }
 
     public Location messageGetLocation(Transaction tx, Long messageKey) {
@@ -436,34 +465,137 @@
         //TODO check that none of the locations specified by the indexes
         //are past the last update location in the journal. This can happen
         //if the index is flushed before the journal. 
-        //
-        //Collection<DestinationEntity> values = destinations.values();
-        //for (DestinationEntity de : values) {
-        //    count += 
-        //}
-        // Go through all the destinations to see if they have messages past
-        // the lastAppendLocation
-        //for (StoredDestinationState sd : 
-        //          
-        // final ArrayList<Long> matches = new ArrayList<Long>();
-        // // Find all the Locations that are >= than the last Append Location.
-        // sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
-        // Long>(lastAppendLocation) {
-        // @Override
-        // protected void matched(Location key, Long value) {
-        // matches.add(value);
-        // }
-        // });
-        //            
-        //            
-        // for (Long sequenceId : matches) {
-        // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
-        // sd.locationIndex.remove(tx, keys.location);
-        // sd.messageIdIndex.remove(tx, keys.messageId);
-        // undoCounter++;
-        // // TODO: do we need to modify the ack positions for the pub sub case?
-        // }
-        // }
-        return 0;
+        int count = 0;
+        
+        //TODO: It might be better to tie the the index update to the journal write
+        //so that we can be sure that all journal entries are on disk prior to 
+        //index update. 
+
+        //Scan MessageKey Index to find message keys past the last append 
+        //location:
+//        final ArrayList<Long> matches = new ArrayList<Long>();
+//        messageKeyIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation)
{
+//
+//            @Override
+//            protected void matched(Location key, Long value) {
+//                matches.add(value);
+//            }
+//        });
+        
+        
+//        for (Long sequenceId : matches) {
+//        MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+//        sd.locationIndex.remove(tx, keys.location);
+//        sd.messageIdIndex.remove(tx, keys.messageId);
+//        count++;
+//        }
+
+        //                 @Override
+        //                 protected void matched(Location key, Long value) {
+        //                 matches.add(value);
+        //                 }
+        //                 });
+        //                            
+        //                            
+        //                 for (Long sequenceId : matches) {
+        //                 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+        //                 sd.locationIndex.remove(tx, keys.location);
+        //                 sd.messageIdIndex.remove(tx, keys.messageId);
+        //                 undoCounter++;
+        //             })
+
+        //        for (DestinationEntity de : destinations.values()) {
+        //             final ArrayList<Long> matches = new ArrayList<Long>();
+        //             // Find all the Locations that are >= than the last Append Location.
+        //             sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
+        //                 Long>(lastAppendLocation) {
+        //                 @Override
+        //                 protected void matched(Location key, Long value) {
+        //                 matches.add(value);
+        //                 }
+        //                 });
+        //                            
+        //                            
+        //                 for (Long sequenceId : matches) {
+        //                 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+        //                 sd.locationIndex.remove(tx, keys.location);
+        //                 sd.messageIdIndex.remove(tx, keys.messageId);
+        //                 undoCounter++;
+        //             }
+        //        }
+        return count;
+    }
+
+    /**
+     * Go through indexes checking to
+     * 
+     * @param gcCandidateSet
+     * @throws IOException
+     */
+    final void removeGCCandidates(final TreeSet<Integer> gcCandidateSet, Transaction
tx) throws IOException {
+
+        // Don't GC files after the first in progress tx
+        Location firstTxLocation = lastUpdate;
+
+        if (firstTxLocation != null) {
+            while (!gcCandidateSet.isEmpty()) {
+                Integer last = gcCandidateSet.last();
+                if (last >= firstTxLocation.getDataFileId()) {
+                    gcCandidateSet.remove(last);
+                } else {
+                    break;
+                }
+            }
+        }
+
+        if (gcCandidateSet.isEmpty()) {
+            return;
+        }
+
+        if (!USE_LOC_INDEX) {
+            return;
+        }
+
+        // Go through the location index to see if we can remove gc candidates:
+        // Use a visitor to cut down the number of pages that we load
+        locationIndex.visit(tx, new BTreeVisitor<Integer, Long>() {
+            int last = -1;
+
+            public boolean isInterestedInKeysBetween(Integer first, Integer second) {
+                if (first == null) {
+                    SortedSet<Integer> subset = gcCandidateSet.headSet(second + 1);
+                    if (!subset.isEmpty() && subset.last().equals(second)) {
+                        subset.remove(second);
+                    }
+                    return !subset.isEmpty();
+                } else if (second == null) {
+                    SortedSet<Integer> subset = gcCandidateSet.tailSet(first);
+                    if (!subset.isEmpty() && subset.first().equals(first)) {
+                        subset.remove(first);
+                    }
+                    return !subset.isEmpty();
+                } else {
+                    SortedSet<Integer> subset = gcCandidateSet.subSet(first, second
+ 1);
+                    if (!subset.isEmpty() && subset.first().equals(first)) {
+                        subset.remove(first);
+                    }
+                    if (!subset.isEmpty() && subset.last().equals(second)) {
+                        subset.remove(second);
+                    }
+                    return !subset.isEmpty();
+                }
+            }
+
+            public void visit(List<Integer> keys, List<Long> values) {
+                for (Integer l : keys) {
+                    int fileId = l;
+                    if (last != fileId) {
+                        gcCandidateSet.remove(fileId);
+                        last = fileId;
+                    }
+                }
+            }
+        });
+
     }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=785769&r1=785768&r2=785769&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
Wed Jun 17 20:00:09 2009
@@ -38,8 +38,8 @@
 import org.apache.activemq.queue.QueueDescriptor;
 
 public abstract class StorePerformanceBase extends TestCase {
-
-    private static int PERFORMANCE_SAMPLES = 5;
+    
+    private static int PERFORMANCE_SAMPLES = 50;
     private static boolean SYNC_TO_DISK = true;
     private static final boolean USE_SHARED_WRITER = true;
 
@@ -56,20 +56,24 @@
     abstract protected Store createStore();
 
     private SharedWriter writer = null;
-    private Semaphore writePermits = null;
+    
+    private Semaphore enqueuePermits;
+    private Semaphore dequeuePermits;
 
     @Override
     protected void setUp() throws Exception {
         store = createStore();
+        //store.setDeleteAllMessages(false);
         store.start();
 
         if (USE_SHARED_WRITER) {
             writer = new SharedWriter();
             writer.start();
         }
-
-        writePermits = new Semaphore(1000);
-
+        
+        enqueuePermits = new Semaphore(20000000);
+        dequeuePermits = new Semaphore(0);
+        
         queueId = new QueueDescriptor();
         queueId.setQueueName(new AsciiBuffer("test"));
         store.execute(new VoidCallback<Exception>() {
@@ -78,6 +82,20 @@
                 session.queueAdd(queueId);
             }
         }, null);
+        
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                Iterator<Store.QueueQueryResult> qqrs = session.queueList(queueId,
1);
+                assertTrue(qqrs.hasNext());
+                Store.QueueQueryResult qqr = qqrs.next();
+                if(qqr.getSize() > 0)
+                {
+                    queueKey.set(qqr.getLastSequence() + 1);
+                    System.out.println("Recovered queue: " + qqr.getDescriptor().getQueueName()
+ " with " + qqr.getCount() + " messages");
+                }                   
+            }
+        }, null);
     }
 
     @Override
@@ -100,8 +118,6 @@
         }
     }
 
-    private final Object wakeupMutex = new Object();
-
     class SharedWriter implements Runnable {
         LinkedBlockingQueue<SharedQueueOp> queue = new LinkedBlockingQueue<SharedQueueOp>(1000);
         private Thread thread;
@@ -200,8 +216,8 @@
 
         public void stop() throws InterruptedException {
             stopped.set(true);
-            while (writePermits.hasQueuedThreads()) {
-                writePermits.release();
+            while (enqueuePermits.hasQueuedThreads()) {
+                enqueuePermits.release();
             }
             thread.join();
         }
@@ -211,7 +227,7 @@
                 Buffer buffer = new Buffer(new byte[1024]);
                 for (long i = 0; !stopped.get(); i++) {
 
-                    writePermits.acquireUninterruptibly();
+                    enqueuePermits.acquire();
 
                     final MessageRecord messageRecord = new MessageRecord();
                     messageRecord.setKey(store.allocateStoreTracking());
@@ -223,10 +239,6 @@
                     SharedQueueOp op = new SharedQueueOp() {
                         public void run() {
                             rate.increment();
-                            writePermits.release();
-                            synchronized (wakeupMutex) {
-                                wakeupMutex.notify();
-                            }
                         }
                     };
 
@@ -239,6 +251,7 @@
                             queueRecord.setQueueKey(queueKey.incrementAndGet());
                             queueRecord.setSize(messageRecord.getSize());
                             session.queueAddMessage(queueId, queueRecord);
+                            dequeuePermits.release();
                         }
                     };
 
@@ -299,6 +312,7 @@
                     SharedQueueOp op = new SharedQueueOp() {
                         public void run() {
                             rate.increment(records.size());
+                            enqueuePermits.release(records.size());
                             queryWait.release();
                         }
                     };
@@ -324,15 +338,7 @@
                         writer.addOp(op);
                     }
 
-                    //queryWait.acquireUninterruptibly();
-                    if (records.isEmpty()) {
-                        //                        synchronized (wakeupMutex) {
-                        //                            try {
-                        //                                wakeupMutex.wait(500);
-                        //                            } catch (InterruptedException e) {
-                        //                            }
-                        //                        }
-                    }
+                    dequeuePermits.acquire();
                     records.clear();
                 }
             } catch (InterruptedException e) {
@@ -346,6 +352,12 @@
         }
     }
 
+    public void test1_1_0() throws Exception {
+        startProducers(1);
+        reportRates();
+    }
+    
+    
     public void test1_1_1() throws Exception {
         startProducers(1);
         startConsumers(1);



Mime
View raw message