activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r475775 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl: ./ data/
Date Thu, 16 Nov 2006 15:37:36 GMT
Author: chirino
Date: Thu Nov 16 07:37:35 2006
New Revision: 475775

URL: http://svn.apache.org/viewvc?view=rev&rev=475775
Log:
Started on an AsyncDataFileWriter implementation so that we can get more concurrent writes
and batch up 
file sync() calls done.


Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
Removed:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataWriter.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=475775&r1=475774&r2=475775
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
Thu Nov 16 07:37:35 2006
@@ -74,6 +74,7 @@
     private String mode;
     private boolean initialized;
     private boolean logIndexChanges=false;
+    private boolean useAsyncWriter=false;
     private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
     private FileLock lock;
     private String indexType=IndexTypes.DISK_INDEX;
@@ -314,6 +315,7 @@
         if(dm==null){
             dm=new DataManager(directory,name);
             dm.setMaxFileLength(maxDataFileLength);
+            dm.setUseAsyncWriter(isUseAsyncWriter());
             recover(dm);
             dataManagers.put(name,dm);
         }
@@ -522,6 +524,14 @@
             
         }
     }
+
+	public synchronized boolean isUseAsyncWriter() {
+		return useAsyncWriter;
+	}
+
+	public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
+		this.useAsyncWriter = useAsyncWriter;
+	}
 
     
    

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java?view=auto&rev=475775
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
Thu Nov 16 07:37:35 2006
@@ -0,0 +1,325 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+
+import edu.emory.mathcs.backport.java.util.LinkedList;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+
+/**
+ * Optimized Store writer that uses an async thread do batched writes to 
+ * the datafile.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class AsyncDataFileWriter implements DataFileWriter {
+    
+	private static final Object SHUTDOWN_COMMAND = new Object();
+	    
+    static class WriteCommand {
+		final RandomAccessFile dataFile;
+		final byte[] data;
+		final long offset;
+		final int size;
+	    final CountDownLatch latch;
+
+		public WriteCommand(RandomAccessFile dataFile, byte[] data, long offset, int size, CountDownLatch
latch) {
+			this.dataFile = dataFile;
+			this.data = data;
+			this.offset = offset;
+			this.size = size;
+			this.latch = latch;
+		}
+
+    }
+    
+    private DataManager dataManager;
+    
+    private final Object enqueueMutex = new Object();
+    private final LinkedList queue = new LinkedList();
+    private final UsageManager usage = new UsageManager();     
+    private CountDownLatch latchAssignedToNewWrites = new CountDownLatch(1);
+    
+    private boolean running;
+    private boolean shutdown;
+    private IOException firstAsyncException;
+    private final CountDownLatch shutdownDone = new CountDownLatch(1);
+
+    
+    /**
+     * Construct a Store writer
+     * 
+     * @param file
+     */
+    AsyncDataFileWriter(DataManager fileManager){
+        this.dataManager=fileManager;
+        this.usage.setLimit(1024*1024*8); // Allow about 8 megs of concurrent data to be
queued up
+    }
+    
+	public void force(final DataFile dataFile) throws IOException {
+		try {
+			CountDownLatch latch = null;
+			
+			synchronized( enqueueMutex ) {
+				latch = (CountDownLatch) dataFile.getWriterData();
+			}
+			
+			if( latch==null ) {
+				return;
+			}
+			latch.await();
+		} catch (InterruptedException e) {
+			throw new InterruptedIOException();
+		}
+	}
+	
+    /**
+     * @param marshaller
+     * @param payload
+     * @param type 
+     * @return
+     * @throws IOException
+     */
+    public StoreLocation storeItem(Marshaller marshaller, Object payload, byte type) throws
IOException {
+    	// We may need to slow down if we are pounding the async thread too 
+    	// hard..
+    	try {
+			usage.waitForSpace();
+		} catch (InterruptedException e) {
+			throw new InterruptedIOException();
+		}
+        
+        // Write the packet our internal buffer.
+    	final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+        buffer.position(DataManager.ITEM_HEAD_SIZE);
+        marshaller.writePayload(payload,buffer);	
+        final int size=buffer.size();
+        int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+        buffer.reset();
+        buffer.writeByte(type);
+        buffer.writeInt(payloadSize);
+
+        final DataItem item=new DataItem();
+        item.setSize(payloadSize);
+        
+        usage.increaseUsage(size);
+
+        // Locate datafile and enqueue into the executor in sychronized block so that 
+        // writes get equeued onto the executor in order that they were assigned by 
+        // the data manager (which is basically just appending)
+        synchronized(enqueueMutex) {
+            // Find the position where this item will land at.
+	        final DataFile dataFile=dataManager.findSpaceForData(item);
+	        dataManager.addInterestInFile(dataFile);
+        	dataFile.setWriterData(latchAssignedToNewWrites);
+	        enqueue(new WriteCommand(dataFile.getRandomAccessFile(), buffer.getData(), item.getOffset(),
size, latchAssignedToNewWrites));
+        }
+                
+        return item;
+    }
+    
+    /**
+     * 
+     */
+    public void updateItem(final StoreLocation location,Marshaller marshaller, Object payload,
byte type) throws IOException {
+    	// We may need to slow down if we are pounding the async thread too 
+    	// hard..
+    	try {
+			usage.waitForSpace();
+		} catch (InterruptedException e) {
+			throw new InterruptedIOException();
+		}
+
+		//Write the packet our internal buffer.
+    	final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+        buffer.position(DataManager.ITEM_HEAD_SIZE);
+        marshaller.writePayload(payload,buffer);
+        final int size=buffer.size();
+        int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+        buffer.reset();
+        buffer.writeByte(type);
+        buffer.writeInt(payloadSize);
+        final DataFile  dataFile = dataManager.getDataFile(location);
+                
+        usage.increaseUsage(size);
+
+        // Equeue the write to an async thread.
+        synchronized(enqueueMutex) {
+        	dataFile.setWriterData(latchAssignedToNewWrites);
+        	enqueue(new WriteCommand(dataFile.getRandomAccessFile(), buffer.getData(), location.getOffset(),
size, latchAssignedToNewWrites));
+        }
+    }
+
+    private void enqueue(Object command) throws IOException {
+    	if( shutdown ) {
+    		throw new IOException("Async Writter Thread Shutdown");
+    	}
+    	if( firstAsyncException !=null )
+    		throw firstAsyncException;
+    	
+    	if( !running ) {
+    		running=true;
+    		Thread thread = new Thread() {
+    			public void run() {
+    				processQueue();
+    			}
+    		};
+    		thread.setPriority(Thread.MAX_PRIORITY);
+    		thread.setDaemon(true);
+    		thread.setName("ActiveMQ Data File Writer");
+    		thread.start();
+    	}
+  		queue.addLast(command);
+  		enqueueMutex.notify();
+    }
+    
+	private Object dequeue() {
+		synchronized( enqueueMutex ) {
+			while( queue.isEmpty() ) {
+				try {
+					enqueueMutex.wait();
+				} catch (InterruptedException e) {
+					return SHUTDOWN_COMMAND;
+				}
+			}
+			return queue.removeFirst();
+		}
+	}
+    
+    public void close() throws IOException {
+    	synchronized( enqueueMutex ) {
+    		if( shutdown == false ) {
+	    		shutdown = true;
+	    		if( running ) {
+	    			queue.add(SHUTDOWN_COMMAND);
+	    	  		enqueueMutex.notify();
+	    		} else {
+	    			shutdownDone.countDown();
+	    		}
+    		}
+    	}
+    	
+    	try {
+			shutdownDone.await();
+		} catch (InterruptedException e) {
+			throw new InterruptedIOException();
+		}
+    	
+    }
+
+    boolean isShutdown() {
+    	synchronized( enqueueMutex ) {
+    		return shutdown;
+    	}    	
+    }
+    
+    /**
+     * The async processing loop that writes to the data files and
+     * does the force calls.  
+     * 
+     * Since the file sync() call is the slowest of all the operations, 
+     * this algorithm tries to 'batch' or group together several file sync() requests 
+     * into a single file sync() call. The batching is accomplished attaching the 
+     * same CountDownLatch instance to every force request in a group.
+     * 
+     */
+    private void processQueue() {
+    	try {
+    		CountDownLatch currentBatchLatch=null;
+    		RandomAccessFile currentBatchDataFile=null;
+	    	while( !isShutdown() ) {
+	    		
+	    		// Block till we get a command.
+	    		Object o = dequeue();
+	    		
+	        	if( o == SHUTDOWN_COMMAND ) {
+	        		if( currentBatchLatch!=null ) {
+	        			currentBatchDataFile.getFD().sync();
+	        			currentBatchLatch.countDown();
+	        		}
+	        		break;
+	        	} else if( o.getClass() == CountDownLatch.class ) {
+		        	// The CountDownLatch is used as the end of batch indicator.	        		
+	        		// Must match..  
+	        		if( o == currentBatchLatch ) {
+	        			currentBatchDataFile.getFD().sync();
+	        			currentBatchLatch.countDown();
+	        			currentBatchLatch=null;
+	        			currentBatchDataFile=null;
+	        		} else {
+	        			new IOException("Got an out of sequence end of end of batch indicator.");
+	        		}
+	        		
+	        	} else if( o.getClass() == WriteCommand.class ) {
+	        		
+        			WriteCommand write = (WriteCommand) o;
+
+        			if( currentBatchDataFile == null )
+        				currentBatchDataFile = write.dataFile;
+        			
+        			// We may need to prematurely sync if the batch
+        			// if user is switching between data files.
+        			if( currentBatchDataFile!=write.dataFile ) {
+	        			currentBatchDataFile.getFD().sync();
+	        			currentBatchDataFile = write.dataFile;
+        			}
+        			
+        			// Write to the data..
+		        	write.dataFile.seek(write.offset);
+		        	write.dataFile.write(write.data,0,write.size);
+		        	usage.decreaseUsage(write.size);
+        			
+	        		// Start of a batch..
+	        		if( currentBatchLatch == null ) {
+	        			currentBatchLatch = write.latch;
+
+        	        	synchronized(enqueueMutex) {
+        	        		// get the request threads to start using a new latch..
+        	        		// write commands allready in the queue should have the 
+        	        		// same latch assigned.
+        	        		latchAssignedToNewWrites = new CountDownLatch(1);
+        	        		// enqueue an end of batch indicator..
+        	        		queue.add(currentBatchLatch);
+        	          		enqueueMutex.notify();
+        	        	}
+        	        	
+	        		} else if( currentBatchLatch!=write.latch ) { 
+	        			// the latch on subsequent writes should match.
+	        			new IOException("Got an out of sequence write.");
+	        		}
+	        	}
+	    	}
+	    	
+		} catch (IOException e) {
+	    	synchronized( enqueueMutex ) {
+	    		firstAsyncException = e;
+	    	}
+		} finally {
+    		shutdownDone.countDown();
+    	}
+    }
+        
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java?view=diff&rev=475775&r1=475774&r2=475775
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
Thu Nov 16 07:37:35 2006
@@ -27,10 +27,12 @@
  * @version $Revision: 1.1.1.1 $
  */
 class DataFile{
+	
     private File file;
     private Integer number;
     private int referenceCount;
     private RandomAccessFile randomAcessFile;
+    private Object writerData;
     long length=0;
 
     DataFile(File file,int number){
@@ -70,12 +72,6 @@
         return file.delete();
     }
 
-    synchronized void force() throws IOException{
-        if(randomAcessFile!=null){
-            randomAcessFile.getFD().sync();
-        }
-    }
-
     synchronized void close() throws IOException{
         if(randomAcessFile!=null){
             randomAcessFile.close();
@@ -98,4 +94,19 @@
         String result = file.getName() + " number = " + number + " , length = " + length
+ " refCount = " + referenceCount;
         return result;
     }
+
+    /**
+     * @return Opaque data that a DataFileWriter may want to associate with the DataFile.
+     */
+	public synchronized Object getWriterData() {
+		return writerData;
+	}
+
+	/**
+	 * @param writerData - Opaque data that a DataFileWriter may want to associate with the
DataFile.
+	 */
+	public synchronized void setWriterData(Object writerData) {
+		this.writerData = writerData;
+	}
+
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java?view=auto&rev=475775
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
Thu Nov 16 07:37:35 2006
@@ -0,0 +1,28 @@
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+
+interface DataFileWriter {
+
+	/**
+	 * @param marshaller
+	 * @param payload
+	 * @param data_item2 
+	 * @return
+	 * @throws IOException
+	 * @throws FileNotFoundException
+	 */
+	public StoreLocation storeItem(Marshaller marshaller, Object payload,
+			byte type) throws IOException;
+
+	public void updateItem(StoreLocation location, Marshaller marshaller,
+			Object payload, byte type) throws IOException;
+
+	public void force(DataFile dataFile) throws IOException;
+
+	public void close() throws IOException;
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java?view=diff&rev=475775&r1=475774&r2=475775
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
Thu Nov 16 07:37:35 2006
@@ -20,7 +20,6 @@
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -45,11 +44,13 @@
     private static final String NAME_PREFIX="data-";
     private final File dir;
     private final String name;
-    private StoreDataReader reader;
-    private StoreDataWriter writer;
+    private SyncDataFileReader reader;
+    private DataFileWriter writer;
     private DataFile currentWriteFile;
     private long maxFileLength = MAX_FILE_LENGTH;
     Map fileMap=new HashMap();
+    
+    private boolean useAsyncWriter=false;
 
     public static final int ITEM_HEAD_SIZE=5; // type + length
     public static final byte DATA_ITEM_TYPE=1;
@@ -61,8 +62,6 @@
     public DataManager(File dir, final String name){
         this.dir=dir;
         this.name=name;
-        this.reader=new StoreDataReader(this);
-        this.writer=new StoreDataWriter(this);
         
         dataFilePrefix = NAME_PREFIX+name+"-";
         // build up list of current dataFiles
@@ -107,34 +106,35 @@
             currentWriteFile=createAndAddDataFile(nextNum);
         }
         item.setOffset(currentWriteFile.getLength());
-        item.setFile(currentWriteFile.getNumber().intValue());
+        item.setFile(currentWriteFile.getNumber().intValue());        
+        currentWriteFile.incrementLength(item.getSize()+ITEM_HEAD_SIZE);
         return currentWriteFile;
     }
 
-    RandomAccessFile getDataFile(StoreLocation item) throws IOException{
+    DataFile getDataFile(StoreLocation item) throws IOException{
         Integer key=new Integer(item.getFile());
         DataFile dataFile=(DataFile) fileMap.get(key);
-        if(dataFile!=null){
-            return dataFile.getRandomAccessFile();
+        if(dataFile==null){
+            log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file "+NAME_PREFIX+name+"-"+item.getFile());
         }
-        log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
-        throw new IOException("Could not locate data file "+NAME_PREFIX+name+"-"+item.getFile());
+        return dataFile;
     }
     
     public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws
IOException{
-        return reader.readItem(marshaller,item);
+        return getReader().readItem(marshaller,item);
     }
 
-    public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload)
throws IOException{
-        return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE);
+    public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
+        return getWriter().storeItem(marshaller,payload, DATA_ITEM_TYPE);
     }
     
-    public synchronized StoreLocation storeRedoItem(Object payload) throws IOException{
-        return writer.storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
+    public StoreLocation storeRedoItem(Object payload) throws IOException{
+        return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
     }
     
-    public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object
payload) throws IOException {
-        writer.updateItem(location,marshaller,payload,DATA_ITEM_TYPE);
+    public void updateItem(StoreLocation location,Marshaller marshaller, Object payload)
throws IOException {
+        getWriter().updateItem(location,marshaller,payload,DATA_ITEM_TYPE);
     }
 
     public synchronized void recoverRedoItems(RedoListener listener) throws IOException{
@@ -149,7 +149,7 @@
         while( true ) {
             byte type;
             try {
-                type = reader.readDataItemSize(item);
+                type = getReader().readDataItemSize(item);
             } catch (IOException ignore) {
                 log.trace("End of data file reached at (header was invalid): "+item);
                 return;
@@ -180,9 +180,10 @@
     }
     
     public synchronized void close() throws IOException{
+    	getWriter().close();
         for(Iterator i=fileMap.values().iterator();i.hasNext();){
             DataFile dataFile=(DataFile) i.next();
-            dataFile.force();
+            getWriter().force(dataFile);
             dataFile.close();
         }
         fileMap.clear();
@@ -191,7 +192,7 @@
     public synchronized void force() throws IOException{
         for(Iterator i=fileMap.values().iterator();i.hasNext();){
             DataFile dataFile=(DataFile) i.next();
-            dataFile.force();
+            getWriter().force(dataFile);
         }
     }
 
@@ -218,7 +219,7 @@
         }
     }
 
-    void addInterestInFile(DataFile dataFile){
+    synchronized void addInterestInFile(DataFile dataFile){
         if(dataFile!=null){
             dataFile.increment();
         }
@@ -287,4 +288,42 @@
     public String toString(){
         return "DataManager:("+NAME_PREFIX+name+")";
     }
+
+	public synchronized SyncDataFileReader getReader() {
+		if( reader == null ) {
+			reader = createReader();
+		}
+		return reader;
+	}
+	protected SyncDataFileReader createReader() {
+		return new SyncDataFileReader(this);
+	}
+	public synchronized void setReader(SyncDataFileReader reader) {
+		this.reader = reader;
+	}
+
+	public synchronized DataFileWriter getWriter() {
+		if( writer==null ) {
+			writer = createWriter();
+		}
+		return writer;
+	}
+	private DataFileWriter createWriter() {
+		if( useAsyncWriter ) {
+			return new AsyncDataFileWriter(this);
+		} else {
+			return new SyncDataFileWriter(this);
+		}
+	}
+	public synchronized void setWriter(DataFileWriter writer) {
+		this.writer = writer;
+	}
+
+	public synchronized boolean isUseAsyncWriter() {
+		return useAsyncWriter;
+	}
+
+	public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
+		this.useAsyncWriter = useAsyncWriter;
+	}
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java?view=auto&rev=475775
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
Thu Nov 16 07:37:35 2006
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.util.DataByteArrayInputStream;
+/**
+ * Optimized Store reader
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class SyncDataFileReader{
+    
+    private DataManager dataManager;
+    private DataByteArrayInputStream dataIn;
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param file
+     */
+    SyncDataFileReader(DataManager fileManager){
+        this.dataManager=fileManager;
+        this.dataIn=new DataByteArrayInputStream();
+    }
+
+    /**
+     * Sets the size property on a DataItem and returns the type of item that this was 
+     * created as.
+     * 
+     * @param marshaller
+     * @param item
+     * @return
+     * @throws IOException
+     */
+    protected byte readDataItemSize(DataItem item) throws IOException {
+
+        RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
+        file.seek(item.getOffset()); // jump to the size field
+        byte rc = file.readByte();
+        item.setSize(file.readInt());
+        return rc;
+    }
+    
+    protected Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
+        RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
+        
+        // TODO: we could reuse the buffer in dataIn if it's big enough to avoid
+        // allocating byte[] arrays on every readItem.
+        byte[] data=new byte[item.getSize()];
+        file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE);
+        file.readFully(data);
+        dataIn.restart(data);
+        return marshaller.readPayload(dataIn);
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?view=auto&rev=475775
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
Thu Nov 16 07:37:35 2006
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+/**
+ * Optimized Store writer.  Synchronously marshalls and writes to the data file. Simple but

+ * may introduce a bit of contention when put under load.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class SyncDataFileWriter implements DataFileWriter{
+    
+    private DataByteArrayOutputStream buffer;
+    private DataManager dataManager;
+
+
+    /**
+     * Construct a Store writer
+     * 
+     * @param file
+     */
+    SyncDataFileWriter(DataManager fileManager){
+        this.dataManager=fileManager;
+        this.buffer=new DataByteArrayOutputStream();
+    }
+
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
java.lang.Object, byte)
+	 */
+    public synchronized StoreLocation storeItem(Marshaller marshaller, Object payload, byte
type) throws IOException {
+        
+        // Write the packet our internal buffer.
+        buffer.reset();
+        buffer.position(DataManager.ITEM_HEAD_SIZE);
+        marshaller.writePayload(payload,buffer);
+        int size=buffer.size();
+        int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+        buffer.reset();
+        buffer.writeByte(type);
+        buffer.writeInt(payloadSize);
+
+        // Find the position where this item will land at.
+        DataItem item=new DataItem();
+        item.setSize(payloadSize);
+        DataFile dataFile=dataManager.findSpaceForData(item);
+        
+        // Now splat the buffer to the file.
+        dataFile.getRandomAccessFile().seek(item.getOffset());
+        dataFile.getRandomAccessFile().write(buffer.getData(),0,size);
+        dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
+        
+        dataManager.addInterestInFile(dataFile);
+        return item;
+    }
+    
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
+	 */
+    public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object
payload, byte type) throws IOException {
+        //Write the packet our internal buffer.
+        buffer.reset();
+        buffer.position(DataManager.ITEM_HEAD_SIZE);
+        marshaller.writePayload(payload,buffer);
+        int size=buffer.size();
+        int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+        buffer.reset();
+        buffer.writeByte(type);
+        buffer.writeInt(payloadSize);
+        DataFile  dataFile = dataManager.getDataFile(location);
+        RandomAccessFile file = dataFile.getRandomAccessFile();
+        file.seek(location.getOffset());
+        file.write(buffer.getData(),0,size);
+        dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
+    }
+
+	public synchronized void force(DataFile dataFile) throws IOException {
+		// If our dirty marker was set.. then we need to sync
+		if( dataFile.getWriterData()!=null ) {
+			dataFile.getRandomAccessFile().getFD().sync();
+	        dataFile.setWriterData(null);
+		}
+	}
+
+	public void close() throws IOException {
+	}
+}



Mime
View raw message