activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r409297 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha: ./ impl/
Date Thu, 25 May 2006 03:46:35 GMT
Author: chirino
Date: Wed May 24 20:46:35 2006
New Revision: 409297

URL: http://svn.apache.org/viewvc?rev=409297&view=rev
Log:
 - Added some of the intial infrastucture needed to be able to use that DataManager to do
index file recovery.
 - ObjectMarshaller now avoids doing byte[] allocations.
 - We now store the data item's size in the index file so that when we load data from the
data file using a 
   DataItem object, the data payload can be read in 1 io operation, previously an extra io
was need to find
   out the size of the payload.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoListener.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java?rev=409297&r1=409296&r2=409297&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
Wed May 24 20:46:35 2006
@@ -13,13 +13,13 @@
  */
 package org.apache.activemq.kaha;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 /**
  * Implementation of a Marshaller for Objects
  * 
@@ -34,13 +34,20 @@
      * @throws IOException
      */
     public void writePayload(Object object,DataOutput dataOut) throws IOException{
-        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-        ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut);
+
+// I failed to see why we just did not just used the provided stream directly        
+//        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+//        ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut);
+//        objectOut.writeObject(object);
+//        objectOut.close();
+//        byte[] data = bytesOut.toByteArray();
+//        dataOut.writeInt(data.length);
+//        dataOut.write(data);
+        
+        ObjectOutputStream objectOut=new ObjectOutputStream((OutputStream) dataOut);
         objectOut.writeObject(object);
-        objectOut.close();
-        byte[] data = bytesOut.toByteArray();
-        dataOut.writeInt(data.length);
-        dataOut.write(data);
+        objectOut.reset();
+        objectOut.flush();
     }
 
     /**
@@ -51,15 +58,25 @@
      * @throws IOException
      */
     public Object readPayload(DataInput dataIn) throws IOException{
-        int size = dataIn.readInt();
-        byte[] data = new byte[size];
-        dataIn.readFully(data);
-        ByteArrayInputStream bytesIn = new ByteArrayInputStream(data);
-        ObjectInputStream objectIn=new ObjectInputStream(bytesIn);
+        
+// I failed to see why we just did not just used the provided stream directly        
+//        int size = dataIn.readInt();
+//        byte[] data = new byte[size];
+//        dataIn.readFully(data);
+//        ByteArrayInputStream bytesIn = new ByteArrayInputStream(data);
+//        ObjectInputStream objectIn=new ObjectInputStream(bytesIn);
+//        try{
+//            return objectIn.readObject();
+//        }catch(ClassNotFoundException e){
+//            throw new IOException(e.getMessage());
+//        }
+        
+        ObjectInputStream objectIn=new ObjectInputStream((InputStream) dataIn);
         try{
             return objectIn.readObject();
-        }catch(ClassNotFoundException e){
+        } catch(ClassNotFoundException e) {
             throw new IOException(e.getMessage());
         }
+
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java?rev=409297&r1=409296&r2=409297&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java
Wed May 24 20:46:35 2006
@@ -13,51 +13,29 @@
  */
 package org.apache.activemq.kaha.impl;
 
-import org.apache.activemq.kaha.Marshaller;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
 /**
  * A a wrapper for a data in the store
  * 
  * @version $Revision: 1.2 $
  */
 final class DataItem implements Item{
-    static final int HEAD_SIZE=6; // magic + len
-    private int size;
-    private long offset=POSITION_NOT_SET;
+    
     private int file=(int) POSITION_NOT_SET;
+    private long offset=POSITION_NOT_SET;
+    private int size;
 
     DataItem(){}
     
+    DataItem(DataItem item) {
+        this.file = item.file;
+        this.offset = item.offset;
+        this.size = item.size;
+    }
+    
     boolean isValid(){
         return file != POSITION_NOT_SET;
     }
 
-    void writeHeader(DataOutput dataOut) throws IOException{
-        dataOut.writeShort(MAGIC);
-        dataOut.writeInt(size);
-    }
-
-    void readHeader(DataInput dataIn) throws IOException{
-        int magic=dataIn.readShort();
-        if(magic==MAGIC){
-            size=dataIn.readInt();
-        }else{
-            throw new BadMagicException("Unexpected Magic value: "+magic);
-        }
-    }
-
-    void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws
IOException{
-        marshaller.writePayload(object,dataOut);
-    }
-
-    Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
-        return marshaller.readPayload(dataIn);
-    }
-
     /**
      * @return Returns the size.
      */
@@ -106,5 +84,9 @@
     public String toString(){
         String result="offset = "+offset+", file = " + file + ", size = "+size;
         return result;
+    }
+
+    public DataItem copy() {
+        return new DataItem(this);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?rev=409297&r1=409296&r2=409297&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
Wed May 24 20:46:35 2006
@@ -22,7 +22,9 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 /**
@@ -40,6 +42,10 @@
     private DataFile currentWriteFile;
     Map fileMap=new HashMap();
 
+    public static final int ITEM_HEAD_SIZE=5; // type + length
+    public static final byte DATA_ITEM_TYPE=1;
+    public static final byte REDO_ITEM_TYPE=2;
+
     DataManager(File dir,String pf){
         this.dir=dir;
         this.prefix=pf;
@@ -91,15 +97,56 @@
         }
         throw new IOException("Could not locate data file "+prefix+item.getFile());
     }
-
-    synchronized Object readItem(Marshaller marshaller,DataItem item) throws IOException{
+    
+    synchronized Object readItem(Marshaller marshaller, DataItem item) throws IOException{
         return reader.readItem(marshaller,item);
     }
 
-    synchronized DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{
-        return writer.storeItem(marshaller,payload);
+    synchronized DataItem storeDataItem(Marshaller marshaller, Object payload) throws IOException{
+        return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE);
+    }
+    
+    synchronized DataItem storeRedoItem(Marshaller marshaller, Object payload) throws IOException{
+        return writer.storeItem(marshaller,payload, REDO_ITEM_TYPE);
     }
 
+    synchronized void recoverRedoItems(Marshaller marshaller, RedoListener listener) throws
IOException{
+        DataItem item = new DataItem();
+        item.setFile(currentWriteFile.getNumber().intValue());
+        item.setOffset(0);
+        while( true ) {
+            byte type;
+            try {
+                type = reader.readDataItemSize(item);
+            } catch (IOException ignore) {
+                log.trace("End of data file reached at (header was invalid): "+item);
+                return;
+            }
+            if( type == REDO_ITEM_TYPE ) {
+                // Un-marshal the redo item
+                Object object;
+                try {
+                    object = readItem(marshaller, item);
+                } catch (IOException e1) {
+                    log.trace("End of data file reached at (payload was invalid): "+item);
+                    return;
+                }
+                try {
+                    
+                    listener.onRedoItem(item, object);
+                    // in case the listener is holding on to item references, copy it
+                    // so we don't change it behind the listener's back.
+                    item = item.copy();
+                    
+                } catch (Exception e) {
+                    throw IOExceptionSupport.create("Recovery handler failed: "+e,e);
+                }
+            }
+            // Move to the next item.
+            item.setOffset(item.getOffset()+ITEM_HEAD_SIZE+item.getSize());
+        }
+    }
+    
     synchronized void close() throws IOException{
         for(Iterator i=fileMap.values().iterator();i.hasNext();){
             DataFile dataFile=(DataFile) i.next();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java?rev=409297&r1=409296&r2=409297&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java
Wed May 24 20:46:35 2006
@@ -17,13 +17,13 @@
 import java.io.DataOutput;
 import java.io.IOException;
 /**
- * A an Item with a relative postion and location to other Items in the Store
+ * A an Item with a relative position and location to other Items in the Store
  * 
  * @version $Revision: 1.2 $
  */
 final class IndexItem implements Item{
     
-    static final int INDEX_SIZE=43;
+    static final int INDEX_SIZE=51;
     //used by linked list
     IndexItem next;
     IndexItem prev;
@@ -31,13 +31,17 @@
     private long offset=POSITION_NOT_SET;
     private long previousItem=POSITION_NOT_SET;
     private long nextItem=POSITION_NOT_SET;
+    private boolean active=true;
+    
+    // TODO: consider just using a DataItem for the following fields.
     private long keyOffset=POSITION_NOT_SET;
     private int keyFile=(int) POSITION_NOT_SET;
+    private int keySize=0;
+    
     private long valueOffset=POSITION_NOT_SET;
     private int valueFile=(int) POSITION_NOT_SET;
-    private boolean active=true;
+    private int valueSize=0;
     
-
     /**
      * Default Constructor
      */
@@ -48,8 +52,10 @@
         nextItem=POSITION_NOT_SET;
         keyOffset=POSITION_NOT_SET;
         keyFile=(int) POSITION_NOT_SET;
+        keySize=0;
         valueOffset=POSITION_NOT_SET;
         valueFile=(int) POSITION_NOT_SET;
+        valueSize=0;        
         active=true;
     }
 
@@ -57,6 +63,7 @@
         DataItem result=new DataItem();
         result.setOffset(keyOffset);
         result.setFile(keyFile);
+        result.setSize(keySize);
         return result;
     }
 
@@ -64,17 +71,20 @@
         DataItem result=new DataItem();
         result.setOffset(valueOffset);
         result.setFile(valueFile);
+        result.setSize(valueSize);
         return result;
     }
 
     void setValueData(DataItem item){
         valueOffset=item.getOffset();
         valueFile=item.getFile();
+        valueSize=item.getSize();
     }
 
     void setKeyData(DataItem item){
         keyOffset=item.getOffset();
         keyFile=item.getFile();
+        keySize=item.getSize();
     }
 
     /**
@@ -88,8 +98,10 @@
         dataOut.writeLong(nextItem);
         dataOut.writeInt(keyFile);
         dataOut.writeLong(keyOffset);
+        dataOut.writeInt(keySize);
         dataOut.writeInt(valueFile);
         dataOut.writeLong(valueOffset);
+        dataOut.writeInt(valueSize);
     }
 
     /**
@@ -105,8 +117,10 @@
         nextItem=dataIn.readLong();
         keyFile=dataIn.readInt();
         keyOffset=dataIn.readLong();
+        keySize=dataIn.readInt();
         valueFile=dataIn.readInt();
         valueOffset=dataIn.readLong();
+        valueSize=dataIn.readInt();
     }
 
     /**
@@ -221,12 +235,31 @@
         this.offset=offset;
     }
 
+    public int getKeySize() {
+        return keySize;
+    }
+
+    public void setKeySize(int keySize) {
+        this.keySize = keySize;
+    }
+
+    public int getValueSize() {
+        return valueSize;
+    }
+
+    public void setValueSize(int valueSize) {
+        this.valueSize = valueSize;
+    }
+
     /**
-     * @return eprtty print of 'this'
+     * @return print of 'this'
      */
     public String toString(){
-        String result="offset="+offset+" , keyFile = "+keyFile+" , keyOffset = "+keyOffset+",
valueOffset = "
-                        +valueOffset+" , previousItem = "+previousItem+" , nextItem = "+nextItem;
+        String result="offset="+offset+
+        ", key=("+keyFile+", "+keyOffset+", "+keySize+")"+
+        ", value=("+valueFile+", "+valueOffset+", "+valueSize+")"+
+        ", previousItem="+previousItem+", nextItem="+nextItem
+        ;
         return result;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?rev=409297&r1=409296&r2=409297&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
Wed May 24 20:46:35 2006
@@ -66,7 +66,7 @@
             removeRoot(key);
         }
         
-        DataItem data = dataManager.storeItem(rootMarshaller, key);
+        DataItem data = dataManager.storeDataItem(rootMarshaller, key);
         IndexItem index = indexManager.createNewIndex();
         index.setKeyData(data);
         IndexItem newRoot = indexManager.createNewIndex();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java?rev=409297&r1=409296&r2=409297&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
Wed May 24 20:46:35 2006
@@ -641,7 +641,7 @@
         IndexItem index=null;
         try{
             if(value!=null){
-                DataItem data=dataManager.storeItem(marshaller,value);
+                DataItem data=dataManager.storeDataItem(marshaller,value);
                 index=indexManager.createNewIndex();
                 index.setValueData(data);
                 IndexItem prev=list.getLast();
@@ -668,7 +668,7 @@
         IndexItem index=null;
         try{
             if(value!=null){
-                DataItem data=dataManager.storeItem(marshaller,value);
+                DataItem data=dataManager.storeDataItem(marshaller,value);
                 index=indexManager.createNewIndex();
                 index.setValueData(data);
                 IndexItem prev=root;
@@ -695,7 +695,7 @@
         IndexItem index=null;
         try{
             if(value!=null){
-                DataItem data=dataManager.storeItem(marshaller,value);
+                DataItem data=dataManager.storeDataItem(marshaller,value);
                 index=indexManager.createNewIndex();
                 index.setValueData(data);
                 IndexItem prev=null;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java?rev=409297&r1=409296&r2=409297&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
Wed May 24 20:46:35 2006
@@ -342,11 +342,11 @@
         try{
             if(key!=null){
                 index=indexManager.createNewIndex();
-                DataItem data=dataManager.storeItem(keyMarshaller,key);
+                DataItem data=dataManager.storeDataItem(keyMarshaller,key);
                 index.setKeyData(data);
             }
             if(value!=null){
-                DataItem data=dataManager.storeItem(valueMarshaller,value);
+                DataItem data=dataManager.storeDataItem(valueMarshaller,value);
                 index.setValueData(data);
             }
             IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast();

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoListener.java?rev=409297&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoListener.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoListener.java
Wed May 24 20:46:35 2006
@@ -0,0 +1,7 @@
+package org.apache.activemq.kaha.impl;
+
+public interface RedoListener {
+
+    void onRedoItem(DataItem item, Object object) throws Exception;
+
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java?rev=409297&r1=409296&r2=409297&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java
Wed May 24 20:46:35 2006
@@ -22,9 +22,9 @@
  * @version $Revision: 1.1.1.1 $
  */
 final class StoreDataReader{
+    
     private DataManager dataManager;
     private StoreByteArrayInputStream dataIn;
-    private byte[] header=new byte[DataItem.HEAD_SIZE];
 
     /**
      * Construct a Store reader
@@ -36,13 +36,31 @@
         this.dataIn=new StoreByteArrayInputStream();
     }
 
+    /**
+     * Sets the size property on a DataItem and returns the type of item that this was 
+     * created as.
+     * 
+     * @param marshaller
+     * @param item
+     * @return
+     * @throws IOException
+     */
+    protected byte readDataItemSize(DataItem item) throws IOException {
+
+        RandomAccessFile file = dataManager.getDataFile(item);
+        file.seek(item.getOffset()); // jump to the size field
+        byte rc = file.readByte();
+        item.setSize(file.readInt());
+        return rc;
+    }
+    
     protected Object readItem(Marshaller marshaller,DataItem item) throws IOException{
         RandomAccessFile file=dataManager.getDataFile(item);
-        file.seek(item.getOffset());
-        file.readFully(header);
-        dataIn.restart(header);
-        item.readHeader(dataIn);
+        
+        // TODO: we could reuse the buffer in dataIn if it's big enough to avoid
+        // allocating byte[] arrays on every readItem.
         byte[] data=new byte[item.getSize()];
+        file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE);
         file.readFully(data);
         dataIn.restart(data);
         return marshaller.readPayload(dataIn);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java?rev=409297&r1=409296&r2=409297&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java
Wed May 24 20:46:35 2006
@@ -18,7 +18,9 @@
  */
 package org.apache.activemq.kaha.impl;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+
 import org.apache.activemq.kaha.Marshaller;
 /**
  * Optimized Store writer
@@ -26,9 +28,11 @@
  * @version $Revision: 1.1.1.1 $
  */
 final class StoreDataWriter{
-    private StoreByteArrayOutputStream dataOut;
+    
+    private StoreByteArrayOutputStream buffer;
     private DataManager dataManager;
 
+
     /**
      * Construct a Store writer
      * 
@@ -36,23 +40,39 @@
      */
     StoreDataWriter(DataManager fileManager){
         this.dataManager=fileManager;
-        this.dataOut=new StoreByteArrayOutputStream();
+        this.buffer=new StoreByteArrayOutputStream();
     }
 
-    DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{
-        dataOut.reset();
-        dataOut.position(DataItem.HEAD_SIZE);
-        marshaller.writePayload(payload,dataOut);
-        int size=dataOut.size();
-        int payloadSize=size-DataItem.HEAD_SIZE;
+    /**
+     * @param marshaller
+     * @param payload
+     * @param data_item2 
+     * @return
+     * @throws IOException
+     * @throws FileNotFoundException
+     */
+    DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException
{
+        
+        // Write the packet our internal buffer.
+        buffer.reset();
+        buffer.position(DataManager.ITEM_HEAD_SIZE);
+        marshaller.writePayload(payload,buffer);
+        int size=buffer.size();
+        int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+        buffer.reset();
+        buffer.writeByte(type);
+        buffer.writeInt(payloadSize);
+
+        // Find the position where this item will land at.
         DataItem item=new DataItem();
         item.setSize(payloadSize);
         DataFile dataFile=dataManager.findSpaceForData(item);
-        dataOut.reset();
-        item.writeHeader(dataOut);
+        
+        // Now splat the buffer to the file.
         dataFile.getRandomAccessFile().seek(item.getOffset());
-        dataFile.getRandomAccessFile().write(dataOut.getData(),0,size);
+        dataFile.getRandomAccessFile().write(buffer.getData(),0,size);
         dataFile.incrementLength(size);
+        
         dataManager.addInterestInFile(dataFile);
         return item;
     }



Mime
View raw message