activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r633603 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async: AsyncDataManager.java DataFileAccessor.java DataFileAppender.java JournalFacade.java NIODataFileAppender.java
Date Tue, 04 Mar 2008 19:37:01 GMT
Author: chirino
Date: Tue Mar  4 11:36:58 2008
New Revision: 633603

URL: http://svn.apache.org/viewvc?rev=633603&view=rev
Log:
The AsyncDataManager now supports doing a callback notification for when a write 
has been secured to disk.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=633603&r1=633602&r2=633603&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
Tue Mar  4 11:36:58 2008
@@ -408,6 +408,23 @@
         }
     }
 
+    public void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws
IOException {
+        Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
+        unUsed.removeAll(inUse);
+                
+        List<DataFile> purgeList = new ArrayList<DataFile>();
+        for (Integer key : unUsed) {
+        	// Only add files less than the lastFile..
+        	if( key.intValue() < lastFile.intValue() ) {
+                DataFile dataFile = (DataFile)fileMap.get(key);
+                purgeList.add(dataFile);
+        	}
+        }
+        for (DataFile dataFile : purgeList) {
+            forceRemoveDataFile(dataFile);
+        }
+	}
+
     public synchronized void consolidateDataFiles() throws IOException {
         List<DataFile> purgeList = new ArrayList<DataFile>();
         for (DataFile dataFile : fileMap.values()) {
@@ -482,8 +499,12 @@
                     cur.setOffset(0);
                 } else {
                     // Set to the next offset..
-                    cur = new Location(location);
-                    cur.setOffset(cur.getOffset() + cur.getSize());
+                	if( location.getSize() == -1 ) {
+                		cur = new Location(location);
+                	}  else {
+	            		cur = new Location(location);
+	            		cur.setOffset(location.getOffset()+location.getSize());
+                	}
                 }
             } else {
                 cur.setOffset(cur.getOffset() + cur.getSize());
@@ -606,6 +627,11 @@
         Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
         return loc;
     }
+    
+    public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException,
IllegalStateException {
+        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
+        return loc;
+    }
 
     public synchronized Location write(ByteSequence data, byte type, boolean sync) throws
IOException, IllegalStateException {
         return appender.storeItem(data, type, sync);
@@ -686,4 +712,28 @@
     public Set<File> getFiles(){
         return fileByFileMap.keySet();
     }
+
+	synchronized public long getDiskSize() {
+		long rc=0;
+        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
+        while( cur !=null ) {
+        	rc += cur.getLength();
+        	cur = (DataFile) cur.getNext();
+        }
+		return rc;
+	}
+
+	synchronized public long getDiskSizeUntil(Location startPosition) {
+		long rc=0;
+        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
+        while( cur !=null ) {
+        	if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
+        		return rc + startPosition.getOffset();
+        	}
+        	rc += cur.getLength();
+        	cur = (DataFile) cur.getNext();
+        }
+		return rc;
+	}
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java?rev=633603&r1=633602&r2=633603&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
Tue Mar  4 11:36:58 2008
@@ -19,7 +19,6 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?rev=633603&r1=633602&r2=633603&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
Tue Mar  4 11:36:58 2008
@@ -20,7 +20,6 @@
 import java.io.InterruptedIOException;
 import java.io.RandomAccessFile;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.activemq.util.ByteSequence;
@@ -41,7 +40,7 @@
 
     protected final AsyncDataManager dataManager;
     protected final Map<WriteKey, WriteCommand> inflightWrites;
-    protected final Object enqueueMutex = new Object();
+    protected final Object enqueueMutex = new Object(){};
     protected WriteBatch nextWriteBatch;
 
     protected boolean shutdown;
@@ -110,12 +109,21 @@
         public final Location location;
         public final ByteSequence data;
         final boolean sync;
+        public final Runnable onComplete;
 
         public WriteCommand(Location location, ByteSequence data, boolean sync) {
             this.location = location;
             this.data = data;
             this.sync = sync;
+            this.onComplete=null;
         }
+
+        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
+            this.location = location;
+            this.data = data;
+			this.onComplete = onComplete;
+            this.sync = false;
+		}
     }
 
 
@@ -176,6 +184,34 @@
 
         return location;
     }
+    
+	public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException
{
+        // Write the packet our internal buffer.
+        int size = data.getLength() + AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
+        location.setType(type);
+
+        WriteBatch batch;
+        WriteCommand write = new WriteCommand(location, data, onComplete);
+
+        // Locate datafile and enqueue into the executor in sychronized block so
+        // that
+        // writes get equeued onto the executor in order that they were assigned
+        // by
+        // the data manager (which is basically just appending)
+
+        synchronized (this) {
+            // Find the position where this item will land at.
+            DataFile dataFile = dataManager.allocateLocation(location);
+            batch = enqueue(dataFile, write);
+        }
+        location.setLatch(batch.latch);
+        inflightWrites.put(new WriteKey(location), write);
+
+        return location;
+	}
 
     private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException
{
         synchronized (enqueueMutex) {
@@ -342,8 +378,8 @@
                     buff.reset();
                 }
 
-                file.getFD().sync();
-
+                file.getFD().sync();                
+                
                 WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
                 dataManager.setLastAppendLocation(lastWrite.location);
 
@@ -358,6 +394,13 @@
                     if (!write.sync) {
                         inflightWrites.remove(new WriteKey(write.location));
                     }
+                    if( write.onComplete !=null ) {
+                    	 try {
+							write.onComplete.run();
+						} catch (Throwable e) {
+							e.printStackTrace();
+						}
+                    }
                     write = (WriteCommand)write.getNext();
                 }
             }
@@ -377,5 +420,6 @@
             shutdownDone.countDown();
         }
     }
+
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java?rev=633603&r1=633602&r2=633603&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
Tue Mar  4 11:36:58 2008
@@ -105,5 +105,11 @@
         ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
         return convertToRecordLocation(dataManager.write(sequence, sync));
     }
+    
+    public RecordLocation write(Packet packet, Runnable onComplete) throws IOException, IllegalStateException
{
+        org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
+        ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
+        return convertToRecordLocation(dataManager.write(sequence, onComplete));
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?rev=633603&r1=633602&r2=633603&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
Tue Mar  4 11:36:58 2008
@@ -166,6 +166,13 @@
                     if (!write.sync) {
                         inflightWrites.remove(new WriteKey(write.location));
                     }
+                    if (write.onComplete != null) {
+						try {
+							write.onComplete.run();
+						} catch (Throwable e) {
+							e.printStackTrace();
+						}
+					}
                     write = (WriteCommand)write.getNext();
                 }
             }



Mime
View raw message