activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r475943 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl: ./ data/
Date Thu, 16 Nov 2006 22:06:22 GMT
Author: chirino
Date: Thu Nov 16 14:06:22 2006
New Revision: 475943

URL: http://svn.apache.org/viewvc?view=rev&rev=475943
Log:
The async writer is now working and enabled by default.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/WriteKey.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=475943&r1=475942&r2=475943
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
Thu Nov 16 14:06:22 2006
@@ -74,7 +74,7 @@
     private String mode;
     private boolean initialized;
     private boolean logIndexChanges=false;
-    private boolean useAsyncWriter=false;
+    private boolean useAsyncWriter=true;
     private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
     private FileLock lock;
     private String indexType=IndexTypes.DISK_INDEX;

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java?view=auto&rev=475943
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
Thu Nov 16 14:06:22 2006
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.data.AsyncDataFileWriter.WriteCommand;
+import org.apache.activemq.kaha.impl.data.AsyncDataFileWriter.WriteKey;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayInputStream;
+/**
+ * Optimized Store reader
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class AsyncDataFileReader implements DataFileReader {
+    // static final Log log = LogFactory.getLog(AsyncDataFileReader.class);
+    
+    private DataManager dataManager;
+    private DataByteArrayInputStream dataIn;
+	private final Map inflightWrites;
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param file
+     */
+    AsyncDataFileReader(DataManager fileManager, AsyncDataFileWriter writer){
+        this.dataManager=fileManager;
+		this.inflightWrites = writer.getInflightWrites();
+        this.dataIn=new DataByteArrayInputStream();
+    }
+
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
+	 */
+    public byte readDataItemSize(DataItem item) throws IOException {
+    	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
+    	if( asyncWrite!= null ) {
+    		item.setSize(asyncWrite.location.getSize());
+    		return asyncWrite.data[0];
+    	}
+        RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
+        byte rc;
+        synchronized(file) {
+	        file.seek(item.getOffset()); // jump to the size field
+	        rc = file.readByte();
+	        item.setSize(file.readInt());
+        }
+        return rc;
+    }
+    
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller,
org.apache.activemq.kaha.StoreLocation)
+	 */
+    public Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
+    	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
+    	if( asyncWrite!= null ) {
+            ByteArrayInputStream stream = new ByteArrayInputStream(asyncWrite.data, DataManager.ITEM_HEAD_SIZE,
item.getSize());
+            return marshaller.readPayload(new DataInputStream(stream));    		
+    	}
+
+    	RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
+        
+        // TODO: we could reuse the buffer in dataIn if it's big enough to avoid
+        // allocating byte[] arrays on every readItem.
+		byte[] data=new byte[item.getSize()];
+        synchronized(file) {
+			file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE);
+			file.readFully(data);
+        }
+		dataIn.restart(data);
+		return marshaller.readPayload(dataIn);
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java?view=diff&rev=475943&r1=475942&r2=475943
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
Thu Nov 16 14:06:22 2006
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.RandomAccessFile;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.StoreLocation;
@@ -36,30 +37,60 @@
  * @version $Revision: 1.1.1.1 $
  */
 final class AsyncDataFileWriter implements DataFileWriter {
+//    static final Log log = LogFactory.getLog(AsyncDataFileWriter.class);
     
-	private static final Object SHUTDOWN_COMMAND = new Object();
+	private static final String SHUTDOWN_COMMAND = "SHUTDOWN";
+	
+	static public class WriteKey {
+	    private final int file;
+	    private final long offset;
+	    private final int hash;
+
+		public WriteKey(StoreLocation item){
+	    	file = item.getFile();
+	    	offset = item.getOffset();
+	    	// TODO: see if we can build a better hash  
+	    	hash = (int) (file  ^ offset);
+	    }
+	 
+	    public int hashCode() {
+	    	return hash;  
+	    }
 	    
-    static class WriteCommand {
-		final RandomAccessFile dataFile;
-		final byte[] data;
-		final long offset;
-		final int size;
-	    final CountDownLatch latch;
+	    public boolean equals(Object obj) {
+	    	WriteKey di = (WriteKey)obj;
+	    	return di.file == file && di.offset == offset;
+	    }
+	}
+
+    public static class WriteCommand {
+    	
+		public final StoreLocation location;
+		public final RandomAccessFile dataFile;
+		public final byte[] data;
+		public final CountDownLatch latch;
 
-		public WriteCommand(RandomAccessFile dataFile, byte[] data, long offset, int size, CountDownLatch
latch) {
+		public WriteCommand(StoreLocation location, RandomAccessFile dataFile, byte[] data, CountDownLatch
latch) {
+			this.location = location;
 			this.dataFile = dataFile;
 			this.data = data;
-			this.offset = offset;
-			this.size = size;
 			this.latch = latch;
 		}
 
+		public String toString() {
+			return "write: "+location+", latch = "+System.identityHashCode(latch);
+		}
     }
     
     private DataManager dataManager;
     
     private final Object enqueueMutex = new Object();
     private final LinkedList queue = new LinkedList();
+    
+    // Maps WriteKey -> WriteCommand for all the writes that still have not landed on

+    // disk.
+    private final ConcurrentHashMap inflightWrites = new ConcurrentHashMap();
+    
     private final UsageManager usage = new UsageManager();     
     private CountDownLatch latchAssignedToNewWrites = new CountDownLatch(1);
     
@@ -91,6 +122,7 @@
 				return;
 			}
 			latch.await();
+			
 		} catch (InterruptedException e) {
 			throw new InterruptedIOException();
 		}
@@ -103,7 +135,7 @@
      * @return
      * @throws IOException
      */
-    public StoreLocation storeItem(Marshaller marshaller, Object payload, byte type) throws
IOException {
+    public DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException
{
     	// We may need to slow down if we are pounding the async thread too 
     	// hard..
     	try {
@@ -121,30 +153,31 @@
         buffer.reset();
         buffer.writeByte(type);
         buffer.writeInt(payloadSize);
-
         final DataItem item=new DataItem();
-        item.setSize(payloadSize);
-        
+        item.setSize(payloadSize);        
         usage.increaseUsage(size);
 
         // Locate datafile and enqueue into the executor in sychronized block so that 
         // writes get equeued onto the executor in order that they were assigned by 
         // the data manager (which is basically just appending)
+    	WriteCommand write; 
         synchronized(enqueueMutex) {
             // Find the position where this item will land at.
 	        final DataFile dataFile=dataManager.findSpaceForData(item);
 	        dataManager.addInterestInFile(dataFile);
         	dataFile.setWriterData(latchAssignedToNewWrites);
-	        enqueue(new WriteCommand(dataFile.getRandomAccessFile(), buffer.getData(), item.getOffset(),
size, latchAssignedToNewWrites));
+        	write = new WriteCommand(item, dataFile.getRandomAccessFile(), buffer.getData(),
latchAssignedToNewWrites);
+	        enqueue(write);
         }
-                
+        
+    	inflightWrites.put(new WriteKey(item), write);
         return item;
     }
     
     /**
      * 
      */
-    public void updateItem(final StoreLocation location,Marshaller marshaller, Object payload,
byte type) throws IOException {
+    public void updateItem(final DataItem item, Marshaller marshaller, Object payload, byte
type) throws IOException {
     	// We may need to slow down if we are pounding the async thread too 
     	// hard..
     	try {
@@ -161,19 +194,24 @@
         int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
         buffer.reset();
         buffer.writeByte(type);
-        buffer.writeInt(payloadSize);
-        final DataFile  dataFile = dataManager.getDataFile(location);
-                
+        buffer.writeInt(payloadSize);        
+        item.setSize(payloadSize);
+        final DataFile  dataFile = dataManager.getDataFile(item);                
+        
         usage.increaseUsage(size);
-
+        
+    	WriteCommand write = new WriteCommand(item, dataFile.getRandomAccessFile(), buffer.getData(),
latchAssignedToNewWrites);
+    	
         // Equeue the write to an async thread.
         synchronized(enqueueMutex) {
         	dataFile.setWriterData(latchAssignedToNewWrites);
-        	enqueue(new WriteCommand(dataFile.getRandomAccessFile(), buffer.getData(), location.getOffset(),
size, latchAssignedToNewWrites));
+        	enqueue(write);
         }
+    	inflightWrites.put(new WriteKey(item), write);
     }
 
     private void enqueue(Object command) throws IOException {
+    	
     	if( shutdown ) {
     		throw new IOException("Async Writter Thread Shutdown");
     	}
@@ -199,6 +237,7 @@
 	private Object dequeue() {
 		synchronized( enqueueMutex ) {
 			while( queue.isEmpty() ) {
+				inflightWrites.clear();
 				try {
 					enqueueMutex.wait();
 				} catch (InterruptedException e) {
@@ -247,14 +286,17 @@
      * 
      */
     private void processQueue() {
+//    	log.debug("Async thread startup");
     	try {
     		CountDownLatch currentBatchLatch=null;
     		RandomAccessFile currentBatchDataFile=null;
-	    	while( !isShutdown() ) {
+
+	    	while( true ) {
 	    		
 	    		// Block till we get a command.
 	    		Object o = dequeue();
-	    		
+//        		log.debug("Processing: "+o);
+	    			    		
 	        	if( o == SHUTDOWN_COMMAND ) {
 	        		if( currentBatchLatch!=null ) {
 	        			currentBatchDataFile.getFD().sync();
@@ -288,10 +330,14 @@
         			}
         			
         			// Write to the data..
-		        	write.dataFile.seek(write.offset);
-		        	write.dataFile.write(write.data,0,write.size);
-		        	usage.decreaseUsage(write.size);
-        			
+        			int size = write.location.getSize()+DataManager.ITEM_HEAD_SIZE;
+        			synchronized(write.dataFile) {
+			        	write.dataFile.seek(write.location.getOffset());
+			        	write.dataFile.write(write.data,0,size);
+        			}
+		        	inflightWrites.remove(new WriteKey(write.location));
+		        	usage.decreaseUsage(size);		        	
+		        	
 	        		// Start of a batch..
 	        		if( currentBatchLatch == null ) {
 	        			currentBatchLatch = write.latch;
@@ -301,14 +347,14 @@
         	        		// write commands allready in the queue should have the 
         	        		// same latch assigned.
         	        		latchAssignedToNewWrites = new CountDownLatch(1);
-        	        		// enqueue an end of batch indicator..
-        	        		queue.add(currentBatchLatch);
-        	          		enqueueMutex.notify();
+        	        		if( !shutdown ) {
+        	        			enqueue(currentBatchLatch);
+        	        		}
         	        	}
         	        	
 	        		} else if( currentBatchLatch!=write.latch ) { 
 	        			// the latch on subsequent writes should match.
-	        			new IOException("Got an out of sequence write.");
+	        			new IOException("Got an out of sequence write");
 	        		}
 	        	}
 	    	}
@@ -317,9 +363,15 @@
 	    	synchronized( enqueueMutex ) {
 	    		firstAsyncException = e;
 	    	}
+//			log.debug("Aync thread shutdown due to error: "+e,e);
 		} finally {
+//			log.debug("Aync thread shutdown");
     		shutdownDone.countDown();
     	}
     }
+
+	public synchronized ConcurrentHashMap getInflightWrites() {
+		return inflightWrites;
+	}
         
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileReader.java?view=auto&rev=475943
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileReader.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileReader.java
Thu Nov 16 14:06:22 2006
@@ -0,0 +1,24 @@
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.IOException;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+
+interface DataFileReader {
+
+	/**
+	 * Sets the size property on a DataItem and returns the type of item that this was 
+	 * created as.
+	 * 
+	 * @param marshaller
+	 * @param item
+	 * @return
+	 * @throws IOException
+	 */
+	byte readDataItemSize(DataItem item) throws IOException;
+
+	Object readItem(Marshaller marshaller, StoreLocation item)
+			throws IOException;
+
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java?view=diff&rev=475943&r1=475942&r2=475943
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
Thu Nov 16 14:06:22 2006
@@ -4,7 +4,6 @@
 import java.io.IOException;
 
 import org.apache.activemq.kaha.Marshaller;
-import org.apache.activemq.kaha.StoreLocation;
 
 interface DataFileWriter {
 
@@ -16,10 +15,10 @@
 	 * @throws IOException
 	 * @throws FileNotFoundException
 	 */
-	public StoreLocation storeItem(Marshaller marshaller, Object payload,
+	public DataItem storeItem(Marshaller marshaller, Object payload,
 			byte type) throws IOException;
 
-	public void updateItem(StoreLocation location, Marshaller marshaller,
+	public void updateItem(DataItem item, Marshaller marshaller,
 			Object payload, byte type) throws IOException;
 
 	public void force(DataFile dataFile) throws IOException;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java?view=diff&rev=475943&r1=475942&r2=475943
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
Thu Nov 16 14:06:22 2006
@@ -44,7 +44,7 @@
     private static final String NAME_PREFIX="data-";
     private final File dir;
     private final String name;
-    private SyncDataFileReader reader;
+    private DataFileReader reader;
     private DataFileWriter writer;
     private DataFile currentWriteFile;
     private long maxFileLength = MAX_FILE_LENGTH;
@@ -134,7 +134,7 @@
     }
     
     public void updateItem(StoreLocation location,Marshaller marshaller, Object payload)
throws IOException {
-        getWriter().updateItem(location,marshaller,payload,DATA_ITEM_TYPE);
+        getWriter().updateItem((DataItem)location,marshaller,payload,DATA_ITEM_TYPE);
     }
 
     public synchronized void recoverRedoItems(RedoListener listener) throws IOException{
@@ -289,16 +289,20 @@
         return "DataManager:("+NAME_PREFIX+name+")";
     }
 
-	public synchronized SyncDataFileReader getReader() {
+	public synchronized DataFileReader getReader() {
 		if( reader == null ) {
 			reader = createReader();
 		}
 		return reader;
 	}
-	protected SyncDataFileReader createReader() {
-		return new SyncDataFileReader(this);
+	protected DataFileReader createReader() {
+		if( useAsyncWriter ) {
+			return new AsyncDataFileReader(this, (AsyncDataFileWriter) getWriter());
+		} else {
+			return new SyncDataFileReader(this);
+		}
 	}
-	public synchronized void setReader(SyncDataFileReader reader) {
+	public synchronized void setReader(DataFileReader reader) {
 		this.reader = reader;
 	}
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java?view=diff&rev=475943&r1=475942&r2=475943
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
Thu Nov 16 14:06:22 2006
@@ -27,7 +27,7 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-final class SyncDataFileReader{
+final class SyncDataFileReader implements DataFileReader {
     
     private DataManager dataManager;
     private DataByteArrayInputStream dataIn;
@@ -42,17 +42,10 @@
         this.dataIn=new DataByteArrayInputStream();
     }
 
-    /**
-     * Sets the size property on a DataItem and returns the type of item that this was 
-     * created as.
-     * 
-     * @param marshaller
-     * @param item
-     * @return
-     * @throws IOException
-     */
-    protected byte readDataItemSize(DataItem item) throws IOException {
-
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
+	 */
+    public byte readDataItemSize(DataItem item) throws IOException {
         RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
         file.seek(item.getOffset()); // jump to the size field
         byte rc = file.readByte();
@@ -60,7 +53,10 @@
         return rc;
     }
     
-    protected Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller,
org.apache.activemq.kaha.StoreLocation)
+	 */
+    public Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
         RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
         
         // TODO: we could reuse the buffer in dataIn if it's big enough to avoid

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?view=diff&rev=475943&r1=475942&r2=475943
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
Thu Nov 16 14:06:22 2006
@@ -48,7 +48,7 @@
     /* (non-Javadoc)
 	 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
java.lang.Object, byte)
 	 */
-    public synchronized StoreLocation storeItem(Marshaller marshaller, Object payload, byte
type) throws IOException {
+    public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type)
throws IOException {
         
         // Write the packet our internal buffer.
         buffer.reset();
@@ -77,7 +77,7 @@
     /* (non-Javadoc)
 	 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
 	 */
-    public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object
payload, byte type) throws IOException {
+    public synchronized void updateItem(DataItem item,Marshaller marshaller, Object payload,
byte type) throws IOException {
         //Write the packet our internal buffer.
         buffer.reset();
         buffer.position(DataManager.ITEM_HEAD_SIZE);
@@ -87,9 +87,10 @@
         buffer.reset();
         buffer.writeByte(type);
         buffer.writeInt(payloadSize);
-        DataFile  dataFile = dataManager.getDataFile(location);
+        item.setSize(payloadSize);
+        DataFile  dataFile = dataManager.getDataFile(item);
         RandomAccessFile file = dataFile.getRandomAccessFile();
-        file.seek(location.getOffset());
+        file.seek(item.getOffset());
         file.write(buffer.getData(),0,size);
         dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
     }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/WriteKey.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/WriteKey.java?view=auto&rev=475943
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/WriteKey.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/WriteKey.java
Thu Nov 16 14:06:22 2006
@@ -0,0 +1,22 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import org.apache.activemq.kaha.StoreLocation;
+
+



Mime
View raw message