activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r379619 [6/30] - in /incubator/activemq/trunk: ./ activecluster/ activecluster/src/java/org/apache/activecluster/ activecluster/src/java/org/apache/activecluster/election/ activecluster/src/java/org/apache/activecluster/election/impl/ activ...
Date Tue, 21 Feb 2006 23:14:17 GMT
Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileManager.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileManager.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileManager.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileManager.java Tue Feb 21 15:12:56 2006
@@ -1,530 +1,530 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.active;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.text.NumberFormat;
-import java.util.HashMap;
-
-import org.apache.activeio.adapter.PacketOutputStream;
-import org.apache.activeio.adapter.PacketToInputStream;
-import org.apache.activeio.journal.InvalidRecordLocationException;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.packet.Packet;
-
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Provides a logical view of many separate files as one single long log file.
- * The separate files that compose the LogFile are Segments of the LogFile.
- * <p/>This class is not thread safe.
- * 
- * @version $Revision: 1.1 $
- */
-final public class LogFileManager {
-
-    public static final int DEFAULT_LOGFILE_COUNT = Integer.parseInt(System.getProperty("org.apache.activeio.journal.active.DefaultLogFileCount", ""+(2)));
-    public static final int DEFAULT_LOGFILE_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.journal.active.DefaultLogFileSize", ""+(1024*1024*20)));
-
-    static final public int SERIALIZED_SIZE = 6+Location.SERIALIZED_SIZE;
-
-    static final public byte DATA_RECORD_TYPE = 1;
-    static final public byte MARK_RECORD_TYPE = 2;
-    static final private NumberFormat onlineLogNameFormat = NumberFormat.getNumberInstance();
-    static {
-        onlineLogNameFormat.setMinimumIntegerDigits(3);
-        onlineLogNameFormat.setMaximumIntegerDigits(3);
-        onlineLogNameFormat.setGroupingUsed(false);
-        onlineLogNameFormat.setParseIntegerOnly(true);
-        onlineLogNameFormat.setMaximumFractionDigits(0);
-    }
-    
-    static final private NumberFormat archiveLogNameFormat = NumberFormat.getNumberInstance();
-    static {
-        archiveLogNameFormat.setMinimumIntegerDigits(8);
-        archiveLogNameFormat.setMaximumIntegerDigits(8);
-        archiveLogNameFormat.setGroupingUsed(false);
-        archiveLogNameFormat.setParseIntegerOnly(true);
-        archiveLogNameFormat.setMaximumFractionDigits(0);
-    }
-
-    // Config
-    private final File logDirectory;
-    private final int initialLogFileSize;
-    private final int onlineLogFileCount;
-    private final AtomicInteger activeLogFileCount = new AtomicInteger(0);
-    
-    // Keeps track of the online log file.
-    private LogFileNode firstNode;
-    private LogFileNode firstActiveNode;
-    private LogFileNode firstInactiveNode;
-    private LogFileNode appendNode;
-
-    private ControlFile controlFile;
-    private int lastLogFileId = -1;
-    private Location lastMark;
-    private boolean disposed;
-    private boolean loadedFromCleanShutDown;
-    
-    private File archiveDirectory;
-    HashMap openArchivedLogs = new HashMap();
-
-    public LogFileManager(File logDirectory) throws IOException {
-        this(logDirectory, DEFAULT_LOGFILE_COUNT, DEFAULT_LOGFILE_SIZE, null);
-    }
-
-    public LogFileManager(File logDirectory, int onlineLogFileCount, int initialLogFileSize, File archiveDirectory) throws IOException {
-        this.logDirectory = logDirectory;
-        this.onlineLogFileCount = onlineLogFileCount;
-        this.initialLogFileSize = initialLogFileSize;
-        initialize(onlineLogFileCount);
-        this.archiveDirectory=archiveDirectory;
-    }
-
-    void initialize(int onlineLogFileCount) throws IOException {
-
-        LogFileNode logFiles[] = new LogFileNode[onlineLogFileCount];
-
-        // Create the log directory if it does not exist.
-        if (!logDirectory.exists()) {
-            if (!logDirectory.mkdirs()) {
-                throw new IOException("Could not create directory: " + logDirectory);
-            }
-        }
-
-        // Open the control file.        
-        int controlDataSize = SERIALIZED_SIZE + (LogFileNode.SERIALIZED_SIZE*onlineLogFileCount);
-        controlFile = new ControlFile(new File(logDirectory, "control.dat"),  controlDataSize);
-        // Make sure we are the only process using the control file.
-        controlFile.lock();
-        
-        // Initialize the nodes.
-        for (int i = 0; i < onlineLogFileCount; i++) {
-            LogFile file = new LogFile(new File(logDirectory, "log-" + onlineLogNameFormat.format(i) + ".dat"),
-                    initialLogFileSize);
-            logFiles[i] = new LogFileNode(file);
-        }
-
-        // Link the nodes together.
-        for (int i = 0; i < onlineLogFileCount; i++) {
-            if (i == (onlineLogFileCount - 1)) {
-                logFiles[i].setNext(logFiles[0]);
-            } else {
-                logFiles[i].setNext(logFiles[i + 1]);
-            }
-        }
-        
-        firstNode = logFiles[0];
-        loadState();
-
-        // Find the first active node
-        for (int i = 0; i < onlineLogFileCount; i++) {
-            if( logFiles[i].isActive() ) {
-                if( firstActiveNode == null || logFiles[i].getId() < firstActiveNode.getId() ) {
-                    firstActiveNode = logFiles[i];
-                }
-            }
-        }
-        
-        // None was active? activate one.
-        if ( firstActiveNode == null ) {
-            firstInactiveNode = logFiles[0];
-            activateNextLogFile();
-        } else {            
-            // Find the append log and the first inactive node
-            firstInactiveNode = null;
-            LogFileNode log = firstActiveNode;
-            do {
-                if( !log.isActive() ) {
-                    firstInactiveNode = log;
-                    break;
-                } else {
-                    appendNode = log;
-                }
-                log = log.getNext();
-            } while (log != firstActiveNode);
-        }
-        
-        // If we did not have a clean shut down then we have to check the state 
-        // of the append log.
-        if( !this.loadedFromCleanShutDown ) {
-            checkAppendLog();
-        }
-                    
-        loadedFromCleanShutDown = false;
-        storeState();
-    }
-
-    private void checkAppendLog() throws IOException {
-        
-        // We are trying to get the true append offset and the last Mark that was written in 
-        // the append log.
-        
-        int offset = 0;
-        Record record = new Record();
-        LogFile logFile = appendNode.getLogFile();
-        Location markLocation=null;
-        
-        while( logFile.loadAndCheckRecord(offset, record) ) {
-            
-            if( record.getLocation().getLogFileId()!= appendNode.getId() || record.getLocation().getLogFileOffset()!=offset ) {
-                // We must have run past the end of the append location.
-                break;
-            }
-
-            if ( record.getRecordType()==LogFileManager.MARK_RECORD_TYPE) {
-                markLocation = record.getLocation();
-            }
-            
-            offset += record.getRecordLength();            
-        }
-        
-        appendNode.setAppendOffset(offset);
-        
-        if( markLocation!=null ) {
-            try {
-                Packet packet = readPacket(markLocation);
-                markLocation = Location.readFromPacket(packet);
-            } catch (InvalidRecordLocationException e) {
-                throw (IOException)new IOException(e.getMessage()).initCause(e);
-            }
-            updateMark(markLocation);
-        }
-        
-    }
-
-    private void storeState() throws IOException {
-        Packet controlData = controlFile.getControlData();
-        if( controlData.remaining() == 0 )
-            return;
-        
-        DataOutput data = new DataOutputStream(new PacketOutputStream(controlData));
-
-        data.writeInt(lastLogFileId);
-        data.writeBoolean(lastMark!=null);
-        if( lastMark!=null )
-            lastMark.writeToDataOutput(data);
-        data.writeBoolean(loadedFromCleanShutDown);
-        
-        // Load each node's state
-        LogFileNode log = firstNode;
-        do {            
-            log.writeExternal( data );
-            log = log.getNext();
-        } while (log != firstNode);
-        
-        controlFile.store();
-    }
-
-    private void loadState() throws IOException {
-        if( controlFile.load() ) {
-            Packet controlData = controlFile.getControlData();
-            if( controlData.remaining() == 0 )
-                return;
-            
-            DataInput data = new DataInputStream(new PacketToInputStream(controlData));
-    
-            lastLogFileId =data.readInt();
-            if( data.readBoolean() )
-                lastMark = Location.readFromDataInput(data);
-            else
-                lastMark = null;
-            loadedFromCleanShutDown = data.readBoolean();
-    
-            // Load each node's state
-            LogFileNode log = firstNode;
-            do {            
-                log.readExternal( data );
-                log = log.getNext();
-            } while (log != firstNode);
-        }
-    }
-
-    public void dispose() {
-
-        if (disposed)
-            return;
-        this.disposed = true;
-        
-        try {
-	        // Close all the opened log files.
-	        LogFileNode log = firstNode;
-	        do {
-	            log.getLogFile().dispose();
-	            log = log.getNext();
-	        } while (log != firstNode);
-	        
-	        loadedFromCleanShutDown=true;
-	        storeState();
-	        controlFile.dispose();
-        } catch ( IOException e ) {        	
-        }
-        
-    }
-
-    private int getNextLogFileId() {
-        return ++lastLogFileId;
-    }
-
-    /**
-     * @param write
-     * @throws IOException
-     */
-    public void append(BatchedWrite write) throws IOException {
-
-        if (!appendNode.isActive())
-            throw new IllegalStateException("Log file is not active.  Writes are not allowed");
-        if (appendNode.isReadOnly())
-            throw new IllegalStateException("Log file has been marked Read Only.  Writes are not allowed");
-
-        // Write and force the data to disk.
-        LogFile logFile = appendNode.getLogFile();
-        ByteBuffer buffer = ((ByteBufferPacket)write.getPacket().getAdapter(ByteBufferPacket.class)).getByteBuffer();
-        int size = buffer.remaining();
-        logFile.write(appendNode.getAppendOffset(), buffer);
-        if( write.getForce() )
-            logFile.force();
-
-        // Update state
-        appendNode.appended(size);
-        if (write.getMark() != null) {
-            updateMark(write.getMark());
-        }
-    }
-
-    /**
-     * @param write
-     * @throws IOException
-     */
-    synchronized private void updateMark(Location mark) throws IOException {
-        // If we wrote a mark we may need to deactivate some log files.
-        this.lastMark = mark;
-        while (firstActiveNode != appendNode) {
-            if (firstActiveNode.getId() < lastMark.getLogFileId()) {
-                
-                if( archiveDirectory!=null ) {
-                    File file = getArchiveFile(firstActiveNode.getId());
-                    firstActiveNode.getLogFile().copyTo(file);
-                }
-                
-                firstActiveNode.deactivate();
-                activeLogFileCount.decrementAndGet();
-                if( firstInactiveNode == null )
-                    firstInactiveNode = firstActiveNode;
-                firstActiveNode = firstActiveNode.getNextActive();
-                
-            } else {
-                break;
-            }
-        }
-    }
-    
-    private File getArchiveFile(int logId) {
-        return  new File(archiveDirectory, "" + archiveLogNameFormat.format(logId) + ".log");
-    }
-    
-    RecordInfo readRecordInfo(Location location) throws IOException, InvalidRecordLocationException {
-
-        LogFile logFile;
-        LogFileNode logFileState = getLogFileWithId(location.getLogFileId());
-        if( logFileState !=null ) {
-            // There can be no record at the append offset.
-            if (logFileState.getAppendOffset() == location.getLogFileOffset()) {
-                throw new InvalidRecordLocationException("No record at (" + location
-                        + ") found.  Location past end of logged data.");
-            }
-            logFile = logFileState.getLogFile();
-        } else {
-            if( archiveDirectory==null ) {
-                throw new InvalidRecordLocationException("Log file: " + location.getLogFileId() + " is not active.");
-            } else {
-                logFile = getArchivedLogFile(location.getLogFileId());
-            }
-        }
-
-        // Is there a record header at the seeked location?
-        try {
-            Record header = new Record();
-            logFile.readRecordHeader(location.getLogFileOffset(), header);
-            return new RecordInfo(location, header, logFileState, logFile);
-        } catch (IOException e) {
-            throw new InvalidRecordLocationException("No record at (" + location + ") found.");
-        }
-    }
-
-    private LogFile getArchivedLogFile(int logFileId) throws InvalidRecordLocationException, IOException {
-        Integer key = new Integer(logFileId);
-        LogFile rc = (LogFile) openArchivedLogs.get(key);
-        if( rc == null ) {
-            File archiveFile = getArchiveFile(logFileId);
-            if( !archiveFile.canRead() )
-                throw new InvalidRecordLocationException("Log file: " + logFileId + " does not exist.");
-            rc = new LogFile(archiveFile, getInitialLogFileSize());
-            openArchivedLogs.put(key, rc);
-            
-            // TODO: turn openArchivedLogs into LRU cache and close old log files.
-        }
-        return rc;
-    }
-
-    LogFileNode getLogFileWithId(int logFileId) throws InvalidRecordLocationException {
-        for (LogFileNode lf = firstActiveNode; lf != null; lf = lf.getNextActive()) {
-            if (lf.getId() == logFileId) {
-                return lf;
-            }
-
-            // Short cut since id's will only increment
-            if (logFileId < lf.getId())
-                break;
-        }
-        return null;
-    }
-
-    /**
-     * @param lastLocation
-     * @return
-     */
-    public Location getNextDataRecordLocation(Location lastLocation) throws IOException, InvalidRecordLocationException {
-        RecordInfo ri = readRecordInfo(lastLocation);
-        while (true) {
-
-            int logFileId = ri.getLocation().getLogFileId();
-            int offset = ri.getNextLocation();
-
-            // Are we overflowing into next logFile?
-            if (offset >= ri.getLogFileState().getAppendOffset()) {
-                LogFileNode nextActive = ri.getLogFileState().getNextActive();
-                if (nextActive == null) {
-                    return null;
-                }
-                logFileId = nextActive.getId();
-                offset = 0;
-            }
-
-            try {
-                ri = readRecordInfo(new Location(logFileId, offset));
-            } catch (InvalidRecordLocationException e) {
-                return null;
-            }
-
-            // Is the next record the right record type?
-            if (ri.getHeader().getRecordType() == DATA_RECORD_TYPE) {
-                return ri.getLocation();
-            }
-            // No? go onto the next record.
-        }
-    }
-
-    /**
-     * @param logFileIndex
-     * @param logFileOffset
-     * @return
-     * @throws IOException
-     * @throws InvalidRecordLocationException
-     */
-    public Packet readPacket(Location location) throws IOException, InvalidRecordLocationException {
-
-        // Is there a record header at the seeked location?
-        RecordInfo recordInfo = readRecordInfo(location);
-
-        byte data[] = new byte[recordInfo.getHeader().getPayloadLength()];
-
-        LogFile logFile = recordInfo.getLogFile();
-        logFile.read(recordInfo.getDataOffset(), data);
-
-        return new ByteArrayPacket(data);
-
-    }
-
-    public int getInitialLogFileSize() {
-        return initialLogFileSize;
-    }
-
-    public Location getFirstActiveLogLocation() {
-        if (firstActiveNode == null)
-            return null;
-        if (firstActiveNode.getAppendOffset() == 0)
-            return null;
-        return new Location(firstActiveNode.getId(), 0);
-    }
-
-    void activateNextLogFile() throws IOException {
-
-        // The current append logFile becomes readonly
-        if (appendNode != null) {
-            appendNode.setReadOnly(true);
-        }
-
-        LogFileNode next = firstInactiveNode;
-        synchronized (this) {
-            firstInactiveNode = firstInactiveNode.getNextInactive();
-            next.activate(getNextLogFileId());
-            if (firstActiveNode == null) {
-                firstActiveNode = next;
-            }
-        }        
-        activeLogFileCount.incrementAndGet();        
-        appendNode = next;
-        
-        storeState();
-    }
-
-    /**
-     * @return Returns the logDirectory.
-     */
-    public File getLogDirectory() {
-        return logDirectory;
-    }
-
-    /**
-     * @return Returns the lastMark.
-     */
-    public Location getLastMarkedRecordLocation() {
-        return lastMark;
-    }
-
-    public Location getNextAppendLocation() {
-        return new Location(appendNode.getId(), appendNode.getAppendOffset());
-    }
-
-    /**
-     * @return Returns the onlineLogFileCount.
-     */
-    public int getOnlineLogFileCount() {
-        return onlineLogFileCount;
-    }
-
-    public boolean isPastHalfActive() {
-        return (onlineLogFileCount/2.f) < activeLogFileCount.get();
-    }
-
-    synchronized  public Location getFirstRecordLocationOfSecondActiveLogFile() {
-        return firstActiveNode.getNextActive().getFirstRecordLocation();
-    }
-
-    synchronized public boolean canActivateNextLogFile() {
-        return firstInactiveNode!=null;
-    }
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.active;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.NumberFormat;
+import java.util.HashMap;
+
+import org.apache.activeio.adapter.PacketOutputStream;
+import org.apache.activeio.adapter.PacketToInputStream;
+import org.apache.activeio.journal.InvalidRecordLocationException;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.Packet;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Provides a logical view of many separate files as one single long log file.
+ * The separate files that compose the LogFile are Segments of the LogFile.
+ * <p/>This class is not thread safe.
+ * 
+ * @version $Revision: 1.1 $
+ */
+final public class LogFileManager {
+
+    public static final int DEFAULT_LOGFILE_COUNT = Integer.parseInt(System.getProperty("org.apache.activeio.journal.active.DefaultLogFileCount", ""+(2)));
+    public static final int DEFAULT_LOGFILE_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.journal.active.DefaultLogFileSize", ""+(1024*1024*20)));
+
+    static final public int SERIALIZED_SIZE = 6+Location.SERIALIZED_SIZE;
+
+    static final public byte DATA_RECORD_TYPE = 1;
+    static final public byte MARK_RECORD_TYPE = 2;
+    static final private NumberFormat onlineLogNameFormat = NumberFormat.getNumberInstance();
+    static {
+        onlineLogNameFormat.setMinimumIntegerDigits(3);
+        onlineLogNameFormat.setMaximumIntegerDigits(3);
+        onlineLogNameFormat.setGroupingUsed(false);
+        onlineLogNameFormat.setParseIntegerOnly(true);
+        onlineLogNameFormat.setMaximumFractionDigits(0);
+    }
+    
+    static final private NumberFormat archiveLogNameFormat = NumberFormat.getNumberInstance();
+    static {
+        archiveLogNameFormat.setMinimumIntegerDigits(8);
+        archiveLogNameFormat.setMaximumIntegerDigits(8);
+        archiveLogNameFormat.setGroupingUsed(false);
+        archiveLogNameFormat.setParseIntegerOnly(true);
+        archiveLogNameFormat.setMaximumFractionDigits(0);
+    }
+
+    // Config
+    private final File logDirectory;
+    private final int initialLogFileSize;
+    private final int onlineLogFileCount;
+    private final AtomicInteger activeLogFileCount = new AtomicInteger(0);
+    
+    // Keeps track of the online log file.
+    private LogFileNode firstNode;
+    private LogFileNode firstActiveNode;
+    private LogFileNode firstInactiveNode;
+    private LogFileNode appendNode;
+
+    private ControlFile controlFile;
+    private int lastLogFileId = -1;
+    private Location lastMark;
+    private boolean disposed;
+    private boolean loadedFromCleanShutDown;
+    
+    private File archiveDirectory;
+    HashMap openArchivedLogs = new HashMap();
+
+    public LogFileManager(File logDirectory) throws IOException {
+        this(logDirectory, DEFAULT_LOGFILE_COUNT, DEFAULT_LOGFILE_SIZE, null);
+    }
+
+    public LogFileManager(File logDirectory, int onlineLogFileCount, int initialLogFileSize, File archiveDirectory) throws IOException {
+        this.logDirectory = logDirectory;
+        this.onlineLogFileCount = onlineLogFileCount;
+        this.initialLogFileSize = initialLogFileSize;
+        initialize(onlineLogFileCount);
+        this.archiveDirectory=archiveDirectory;
+    }
+
+    void initialize(int onlineLogFileCount) throws IOException {
+
+        LogFileNode logFiles[] = new LogFileNode[onlineLogFileCount];
+
+        // Create the log directory if it does not exist.
+        if (!logDirectory.exists()) {
+            if (!logDirectory.mkdirs()) {
+                throw new IOException("Could not create directory: " + logDirectory);
+            }
+        }
+
+        // Open the control file.        
+        int controlDataSize = SERIALIZED_SIZE + (LogFileNode.SERIALIZED_SIZE*onlineLogFileCount);
+        controlFile = new ControlFile(new File(logDirectory, "control.dat"),  controlDataSize);
+        // Make sure we are the only process using the control file.
+        controlFile.lock();
+        
+        // Initialize the nodes.
+        for (int i = 0; i < onlineLogFileCount; i++) {
+            LogFile file = new LogFile(new File(logDirectory, "log-" + onlineLogNameFormat.format(i) + ".dat"),
+                    initialLogFileSize);
+            logFiles[i] = new LogFileNode(file);
+        }
+
+        // Link the nodes together.
+        for (int i = 0; i < onlineLogFileCount; i++) {
+            if (i == (onlineLogFileCount - 1)) {
+                logFiles[i].setNext(logFiles[0]);
+            } else {
+                logFiles[i].setNext(logFiles[i + 1]);
+            }
+        }
+        
+        firstNode = logFiles[0];
+        loadState();
+
+        // Find the first active node
+        for (int i = 0; i < onlineLogFileCount; i++) {
+            if( logFiles[i].isActive() ) {
+                if( firstActiveNode == null || logFiles[i].getId() < firstActiveNode.getId() ) {
+                    firstActiveNode = logFiles[i];
+                }
+            }
+        }
+        
+        // None was active? activate one.
+        if ( firstActiveNode == null ) {
+            firstInactiveNode = logFiles[0];
+            activateNextLogFile();
+        } else {            
+            // Find the append log and the first inactive node
+            firstInactiveNode = null;
+            LogFileNode log = firstActiveNode;
+            do {
+                if( !log.isActive() ) {
+                    firstInactiveNode = log;
+                    break;
+                } else {
+                    appendNode = log;
+                }
+                log = log.getNext();
+            } while (log != firstActiveNode);
+        }
+        
+        // If we did not have a clean shut down then we have to check the state 
+        // of the append log.
+        if( !this.loadedFromCleanShutDown ) {
+            checkAppendLog();
+        }
+                    
+        loadedFromCleanShutDown = false;
+        storeState();
+    }
+
+    private void checkAppendLog() throws IOException {
+        
+        // We are trying to get the true append offset and the last Mark that was written in 
+        // the append log.
+        
+        int offset = 0;
+        Record record = new Record();
+        LogFile logFile = appendNode.getLogFile();
+        Location markLocation=null;
+        
+        while( logFile.loadAndCheckRecord(offset, record) ) {
+            
+            if( record.getLocation().getLogFileId()!= appendNode.getId() || record.getLocation().getLogFileOffset()!=offset ) {
+                // We must have run past the end of the append location.
+                break;
+            }
+
+            if ( record.getRecordType()==LogFileManager.MARK_RECORD_TYPE) {
+                markLocation = record.getLocation();
+            }
+            
+            offset += record.getRecordLength();            
+        }
+        
+        appendNode.setAppendOffset(offset);
+        
+        if( markLocation!=null ) {
+            try {
+                Packet packet = readPacket(markLocation);
+                markLocation = Location.readFromPacket(packet);
+            } catch (InvalidRecordLocationException e) {
+                throw (IOException)new IOException(e.getMessage()).initCause(e);
+            }
+            updateMark(markLocation);
+        }
+        
+    }
+
+    private void storeState() throws IOException {
+        Packet controlData = controlFile.getControlData();
+        if( controlData.remaining() == 0 )
+            return;
+        
+        DataOutput data = new DataOutputStream(new PacketOutputStream(controlData));
+
+        data.writeInt(lastLogFileId);
+        data.writeBoolean(lastMark!=null);
+        if( lastMark!=null )
+            lastMark.writeToDataOutput(data);
+        data.writeBoolean(loadedFromCleanShutDown);
+        
+        // Load each node's state
+        LogFileNode log = firstNode;
+        do {            
+            log.writeExternal( data );
+            log = log.getNext();
+        } while (log != firstNode);
+        
+        controlFile.store();
+    }
+
+    private void loadState() throws IOException {
+        if( controlFile.load() ) {
+            Packet controlData = controlFile.getControlData();
+            if( controlData.remaining() == 0 )
+                return;
+            
+            DataInput data = new DataInputStream(new PacketToInputStream(controlData));
+    
+            lastLogFileId =data.readInt();
+            if( data.readBoolean() )
+                lastMark = Location.readFromDataInput(data);
+            else
+                lastMark = null;
+            loadedFromCleanShutDown = data.readBoolean();
+    
+            // Load each node's state
+            LogFileNode log = firstNode;
+            do {            
+                log.readExternal( data );
+                log = log.getNext();
+            } while (log != firstNode);
+        }
+    }
+
+    public void dispose() {
+
+        if (disposed)
+            return;
+        this.disposed = true;
+        
+        try {
+	        // Close all the opened log files.
+	        LogFileNode log = firstNode;
+	        do {
+	            log.getLogFile().dispose();
+	            log = log.getNext();
+	        } while (log != firstNode);
+	        
+	        loadedFromCleanShutDown=true;
+	        storeState();
+	        controlFile.dispose();
+        } catch ( IOException e ) {        	
+        }
+        
+    }
+
+    private int getNextLogFileId() {
+        return ++lastLogFileId;
+    }
+
+    /**
+     * @param write
+     * @throws IOException
+     */
+    public void append(BatchedWrite write) throws IOException {
+
+        if (!appendNode.isActive())
+            throw new IllegalStateException("Log file is not active.  Writes are not allowed");
+        if (appendNode.isReadOnly())
+            throw new IllegalStateException("Log file has been marked Read Only.  Writes are not allowed");
+
+        // Write and force the data to disk.
+        LogFile logFile = appendNode.getLogFile();
+        ByteBuffer buffer = ((ByteBufferPacket)write.getPacket().getAdapter(ByteBufferPacket.class)).getByteBuffer();
+        int size = buffer.remaining();
+        logFile.write(appendNode.getAppendOffset(), buffer);
+        if( write.getForce() )
+            logFile.force();
+
+        // Update state
+        appendNode.appended(size);
+        if (write.getMark() != null) {
+            updateMark(write.getMark());
+        }
+    }
+
+    /**
+     * @param write
+     * @throws IOException
+     */
+    synchronized private void updateMark(Location mark) throws IOException {
+        // If we wrote a mark we may need to deactivate some log files.
+        this.lastMark = mark;
+        while (firstActiveNode != appendNode) {
+            if (firstActiveNode.getId() < lastMark.getLogFileId()) {
+                
+                if( archiveDirectory!=null ) {
+                    File file = getArchiveFile(firstActiveNode.getId());
+                    firstActiveNode.getLogFile().copyTo(file);
+                }
+                
+                firstActiveNode.deactivate();
+                activeLogFileCount.decrementAndGet();
+                if( firstInactiveNode == null )
+                    firstInactiveNode = firstActiveNode;
+                firstActiveNode = firstActiveNode.getNextActive();
+                
+            } else {
+                break;
+            }
+        }
+    }
+    
+    private File getArchiveFile(int logId) {
+        return  new File(archiveDirectory, "" + archiveLogNameFormat.format(logId) + ".log");
+    }
+    
+    RecordInfo readRecordInfo(Location location) throws IOException, InvalidRecordLocationException {
+
+        LogFile logFile;
+        LogFileNode logFileState = getLogFileWithId(location.getLogFileId());
+        if( logFileState !=null ) {
+            // There can be no record at the append offset.
+            if (logFileState.getAppendOffset() == location.getLogFileOffset()) {
+                throw new InvalidRecordLocationException("No record at (" + location
+                        + ") found.  Location past end of logged data.");
+            }
+            logFile = logFileState.getLogFile();
+        } else {
+            if( archiveDirectory==null ) {
+                throw new InvalidRecordLocationException("Log file: " + location.getLogFileId() + " is not active.");
+            } else {
+                logFile = getArchivedLogFile(location.getLogFileId());
+            }
+        }
+
+        // Is there a record header at the seeked location?
+        try {
+            Record header = new Record();
+            logFile.readRecordHeader(location.getLogFileOffset(), header);
+            return new RecordInfo(location, header, logFileState, logFile);
+        } catch (IOException e) {
+            throw new InvalidRecordLocationException("No record at (" + location + ") found.");
+        }
+    }
+
+    private LogFile getArchivedLogFile(int logFileId) throws InvalidRecordLocationException, IOException {
+        Integer key = new Integer(logFileId);
+        LogFile rc = (LogFile) openArchivedLogs.get(key);
+        if( rc == null ) {
+            File archiveFile = getArchiveFile(logFileId);
+            if( !archiveFile.canRead() )
+                throw new InvalidRecordLocationException("Log file: " + logFileId + " does not exist.");
+            rc = new LogFile(archiveFile, getInitialLogFileSize());
+            openArchivedLogs.put(key, rc);
+            
+            // TODO: turn openArchivedLogs into LRU cache and close old log files.
+        }
+        return rc;
+    }
+
+    LogFileNode getLogFileWithId(int logFileId) throws InvalidRecordLocationException {
+        for (LogFileNode lf = firstActiveNode; lf != null; lf = lf.getNextActive()) {
+            if (lf.getId() == logFileId) {
+                return lf;
+            }
+
+            // Short cut since id's will only increment
+            if (logFileId < lf.getId())
+                break;
+        }
+        return null;
+    }
+
+    /**
+     * @param lastLocation
+     * @return
+     */
+    public Location getNextDataRecordLocation(Location lastLocation) throws IOException, InvalidRecordLocationException {
+        RecordInfo ri = readRecordInfo(lastLocation);
+        while (true) {
+
+            int logFileId = ri.getLocation().getLogFileId();
+            int offset = ri.getNextLocation();
+
+            // Are we overflowing into next logFile?
+            if (offset >= ri.getLogFileState().getAppendOffset()) {
+                LogFileNode nextActive = ri.getLogFileState().getNextActive();
+                if (nextActive == null) {
+                    return null;
+                }
+                logFileId = nextActive.getId();
+                offset = 0;
+            }
+
+            try {
+                ri = readRecordInfo(new Location(logFileId, offset));
+            } catch (InvalidRecordLocationException e) {
+                return null;
+            }
+
+            // Is the next record the right record type?
+            if (ri.getHeader().getRecordType() == DATA_RECORD_TYPE) {
+                return ri.getLocation();
+            }
+            // No? go onto the next record.
+        }
+    }
+
+    /**
+     * @param logFileIndex
+     * @param logFileOffset
+     * @return
+     * @throws IOException
+     * @throws InvalidRecordLocationException
+     */
+    public Packet readPacket(Location location) throws IOException, InvalidRecordLocationException {
+
+        // Is there a record header at the seeked location?
+        RecordInfo recordInfo = readRecordInfo(location);
+
+        byte data[] = new byte[recordInfo.getHeader().getPayloadLength()];
+
+        LogFile logFile = recordInfo.getLogFile();
+        logFile.read(recordInfo.getDataOffset(), data);
+
+        return new ByteArrayPacket(data);
+
+    }
+
+    public int getInitialLogFileSize() {
+        return initialLogFileSize;
+    }
+
+    public Location getFirstActiveLogLocation() {
+        if (firstActiveNode == null)
+            return null;
+        if (firstActiveNode.getAppendOffset() == 0)
+            return null;
+        return new Location(firstActiveNode.getId(), 0);
+    }
+
+    void activateNextLogFile() throws IOException {
+
+        // The current append logFile becomes readonly
+        if (appendNode != null) {
+            appendNode.setReadOnly(true);
+        }
+
+        LogFileNode next = firstInactiveNode;
+        synchronized (this) {
+            firstInactiveNode = firstInactiveNode.getNextInactive();
+            next.activate(getNextLogFileId());
+            if (firstActiveNode == null) {
+                firstActiveNode = next;
+            }
+        }        
+        activeLogFileCount.incrementAndGet();        
+        appendNode = next;
+        
+        storeState();
+    }
+
+    /**
+     * @return Returns the logDirectory.
+     */
+    public File getLogDirectory() {
+        return logDirectory;
+    }
+
+    /**
+     * @return Returns the lastMark.
+     */
+    public Location getLastMarkedRecordLocation() {
+        return lastMark;
+    }
+
+    public Location getNextAppendLocation() {
+        return new Location(appendNode.getId(), appendNode.getAppendOffset());
+    }
+
+    /**
+     * @return Returns the onlineLogFileCount.
+     */
+    public int getOnlineLogFileCount() {
+        return onlineLogFileCount;
+    }
+
+    public boolean isPastHalfActive() {
+        return (onlineLogFileCount/2.f) < activeLogFileCount.get();
+    }
+
+    synchronized  public Location getFirstRecordLocationOfSecondActiveLogFile() {
+        return firstActiveNode.getNextActive().getFirstRecordLocation();
+    }
+
+    synchronized public boolean canActivateNextLogFile() {
+        return firstInactiveNode!=null;
+    }
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileNode.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileNode.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileNode.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileNode.java Tue Feb 21 15:12:56 2006
@@ -1,160 +1,160 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.active;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * @version $Revision: 1.1 $
- */
-final class LogFileNode {
-    
-    static final public int SERIALIZED_SIZE = 10;
-
-    private final LogFile logFile;    
-    private LogFileNode next;
-
-    /** The id of the log file. */
-    private int id;
-    /** Does it have live records in it? */
-    private boolean active = false;
-    /** Is the log file in readonly mode */
-    private boolean readOnly;
-    /** The location of the next append offset */
-    private int appendOffset = 0;
-
-    public LogFileNode(LogFile logFile) {
-        this.logFile = logFile;
-    }
-
-    public LogFile getLogFile() {
-        return logFile;
-    }
-
-    /////////////////////////////////////////////////////////////
-    //
-    // Method used to mange the state of the log file.
-    //
-    /////////////////////////////////////////////////////////////
-
-    public void activate(int id) {
-        if (active)
-            throw new IllegalStateException("Log already active.");
-        this.id = id;
-        this.readOnly = false;
-        this.active = true;
-        this.appendOffset = 0;
-    }
-
-    public int getId() {
-        return id;
-    }
-
-    public void setReadOnly(boolean enable) {
-        if (!active)
-            throw new IllegalStateException("Log not active.");
-        this.readOnly = enable;
-    }
-
-    public void deactivate() throws IOException {
-        if (!active)
-            throw new IllegalStateException("Log already inactive.");      
-        this.active=false; 
-        this.id = -1;
-        this.readOnly = true;
-        this.appendOffset = 0;
-        getLogFile().resize();
-    }
-
-    public boolean isActive() {
-        return active;
-    }
-
-    public int getAppendOffset() {
-        return appendOffset;
-    }
-
-    public Location getFirstRecordLocation() {
-        if (isActive() && appendOffset > 0)
-            return new Location(getId(), 0);
-        return null;
-    }
-
-    public boolean isReadOnly() {
-        return readOnly;
-    }
-
-    public void appended(int i) {
-        appendOffset += i;
-    }
-    
-    /////////////////////////////////////////////////////////////
-    //
-    // Method used to maintain the list of LogFileNodes used by 
-    // the LogFileManager
-    //
-    /////////////////////////////////////////////////////////////
-    
-    public LogFileNode getNext() {
-        return next;
-    }
-
-    public void setNext(LogFileNode state) {
-        next = state;
-    }
-
-    public LogFileNode getNextActive() {
-        if (getNext().isActive())
-            return getNext();
-        return null;
-    }
-
-    public LogFileNode getNextInactive() {
-        if (!getNext().isActive())
-            return getNext();
-        return null;
-    }
-    
-    /**
-     * @param data
-     * @throws IOException 
-     */
-    public void writeExternal(DataOutput data) throws IOException {
-        data.writeInt(id);
-        data.writeBoolean(active);
-        data.writeBoolean(readOnly);
-        data.writeInt(appendOffset);
-    }
-
-    /**
-     * @param data
-     * @throws IOException 
-     */
-    public void readExternal(DataInput data) throws IOException {
-        id = data.readInt();
-        active = data.readBoolean();
-        readOnly = data.readBoolean();
-        appendOffset = data.readInt();
-    }
-
-    public void setAppendOffset(int offset) {
-        appendOffset = offset;
-    }
-
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.active;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+final class LogFileNode {
+    
+    static final public int SERIALIZED_SIZE = 10;
+
+    private final LogFile logFile;    
+    private LogFileNode next;
+
+    /** The id of the log file. */
+    private int id;
+    /** Does it have live records in it? */
+    private boolean active = false;
+    /** Is the log file in readonly mode */
+    private boolean readOnly;
+    /** The location of the next append offset */
+    private int appendOffset = 0;
+
+    public LogFileNode(LogFile logFile) {
+        this.logFile = logFile;
+    }
+
+    public LogFile getLogFile() {
+        return logFile;
+    }
+
+    /////////////////////////////////////////////////////////////
+    //
+    // Method used to mange the state of the log file.
+    //
+    /////////////////////////////////////////////////////////////
+
+    public void activate(int id) {
+        if (active)
+            throw new IllegalStateException("Log already active.");
+        this.id = id;
+        this.readOnly = false;
+        this.active = true;
+        this.appendOffset = 0;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public void setReadOnly(boolean enable) {
+        if (!active)
+            throw new IllegalStateException("Log not active.");
+        this.readOnly = enable;
+    }
+
+    public void deactivate() throws IOException {
+        if (!active)
+            throw new IllegalStateException("Log already inactive.");      
+        this.active=false; 
+        this.id = -1;
+        this.readOnly = true;
+        this.appendOffset = 0;
+        getLogFile().resize();
+    }
+
+    public boolean isActive() {
+        return active;
+    }
+
+    public int getAppendOffset() {
+        return appendOffset;
+    }
+
+    public Location getFirstRecordLocation() {
+        if (isActive() && appendOffset > 0)
+            return new Location(getId(), 0);
+        return null;
+    }
+
+    public boolean isReadOnly() {
+        return readOnly;
+    }
+
+    public void appended(int i) {
+        appendOffset += i;
+    }
+    
+    /////////////////////////////////////////////////////////////
+    //
+    // Method used to maintain the list of LogFileNodes used by 
+    // the LogFileManager
+    //
+    /////////////////////////////////////////////////////////////
+    
+    public LogFileNode getNext() {
+        return next;
+    }
+
+    public void setNext(LogFileNode state) {
+        next = state;
+    }
+
+    public LogFileNode getNextActive() {
+        if (getNext().isActive())
+            return getNext();
+        return null;
+    }
+
+    public LogFileNode getNextInactive() {
+        if (!getNext().isActive())
+            return getNext();
+        return null;
+    }
+    
+    /**
+     * @param data
+     * @throws IOException 
+     */
+    public void writeExternal(DataOutput data) throws IOException {
+        data.writeInt(id);
+        data.writeBoolean(active);
+        data.writeBoolean(readOnly);
+        data.writeInt(appendOffset);
+    }
+
+    /**
+     * @param data
+     * @throws IOException 
+     */
+    public void readExternal(DataInput data) throws IOException {
+        id = data.readInt();
+        active = data.readBoolean();
+        readOnly = data.readBoolean();
+        appendOffset = data.readInt();
+    }
+
+    public void setAppendOffset(int offset) {
+        appendOffset = offset;
+    }
+
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFileNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Record.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Record.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Record.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Record.java Tue Feb 21 15:12:56 2006
@@ -1,307 +1,307 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.active;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.zip.CRC32;
-
-import org.apache.activeio.adapter.PacketOutputStream;
-import org.apache.activeio.adapter.PacketToInputStream;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.Packet;
-
-
-/**
- * Serializes/Deserializes data records. 
- * 
- * @version $Revision: 1.1 $
- */
-final public class Record {
-    
-    static final public int RECORD_HEADER_SIZE=8+Location.SERIALIZED_SIZE;
-    static final public int RECORD_FOOTER_SIZE=12+Location.SERIALIZED_SIZE;
-	static final public int RECORD_BASE_SIZE=RECORD_HEADER_SIZE+RECORD_FOOTER_SIZE;
-	
-    static final public byte[] START_OF_RECORD 	= new byte[] { 'S', 'o', 'R' }; 
-    static final public byte[] END_OF_RECORD 	= new byte[] { 'E', 'o', 'R', '.' }; 
-        
-	static final public int SELECTED_CHECKSUM_ALGORITHIM;
-	static final public int NO_CHECKSUM_ALGORITHIM=0;
-	static final public int HASH_CHECKSUM_ALGORITHIM=1;
-	static final public int CRC32_CHECKSUM_ALGORITHIM=2;
-	
-	static {
-		String type = System.getProperty("org.apache.activeio.journal.active.SELECTED_CHECKSUM_ALGORITHIM", "none");
-		if( "none".equals(type) ) {
-			SELECTED_CHECKSUM_ALGORITHIM = NO_CHECKSUM_ALGORITHIM;			
-		} else if( "crc32".equals(type) ) {
-			SELECTED_CHECKSUM_ALGORITHIM = CRC32_CHECKSUM_ALGORITHIM;			
-		} else if( "hash".equals(type) ) {
-			SELECTED_CHECKSUM_ALGORITHIM = HASH_CHECKSUM_ALGORITHIM;			
-		} else {
-			System.err.println("System property 'org.apache.activeio.journal.active.SELECTED_CHECKSUM_ALGORITHIM' not set properly.  Valid values are: 'none', 'hash', or 'crc32'");
-			SELECTED_CHECKSUM_ALGORITHIM = NO_CHECKSUM_ALGORITHIM;			
-		}
-	}
-	
-	static public boolean isChecksumingEnabled() {
-		return SELECTED_CHECKSUM_ALGORITHIM!=NO_CHECKSUM_ALGORITHIM;
-	}
-		    
-    private final ByteArrayPacket headerFooterPacket = new ByteArrayPacket(new byte[RECORD_BASE_SIZE]);
-    private final DataOutputStream headerFooterData = new DataOutputStream(new PacketOutputStream(headerFooterPacket));
-
-    private int payloadLength;
-    private Location location;
-    private byte recordType;        
-    private long checksum;	
-    private Location mark;
-    private Packet payload;
- 		
-    public Record() {        
-    }
-
-    public Record(byte recordType, Packet payload, Location mark) throws IOException {
-        this(null, recordType, payload, mark);
-    }
-    
-    public Record(Location location, byte recordType, Packet payload, Location mark) throws IOException {
-        this.location = location;
-        this.recordType = recordType;
-        this.mark = mark;
-        this.payload = payload.slice();
-        this.payloadLength = payload.remaining();
-        if( isChecksumingEnabled() ) {
-            checksum(new DataInputStream(new PacketToInputStream(this.payload)));
-        }
-
-        writeHeader(headerFooterData);
-        writeFooter(headerFooterData);
-    }    
-    
-    public void setLocation(Location location) throws IOException {
-        this.location = location;
-        headerFooterPacket.clear();
-        headerFooterPacket.position(8);
-        location.writeToDataOutput(headerFooterData);
-        headerFooterPacket.position(RECORD_HEADER_SIZE+8);
-        location.writeToDataOutput(headerFooterData);
-        payload.clear();
-        headerFooterPacket.position(0);
-        headerFooterPacket.limit(RECORD_HEADER_SIZE);
-    }
-    
-	private void writeHeader( DataOutput out ) throws IOException {
-	    out.write(START_OF_RECORD);
-	    out.writeByte(recordType);
-	    out.writeInt(payloadLength);
-        if( location!=null )
-            location.writeToDataOutput(out);
-        else
-            out.writeLong(0);
-	}	
-	
-	public void readHeader( DataInput in ) throws IOException {
-        readAndCheckConstant(in, START_OF_RECORD, "Invalid record header: start of record constant missing.");
-        recordType = in.readByte();
-        payloadLength = in.readInt();
-        if( payloadLength < 0 )
-            throw new IOException("Invalid record header: record length cannot be less than zero.");
-        location = Location.readFromDataInput(in);
-	}
-	
-	private void writeFooter( DataOutput out ) throws IOException {
-	    out.writeLong(checksum);
-        if( location!=null )
-            location.writeToDataOutput(out);
-        else
-            out.writeLong(0);
-	    out.write(END_OF_RECORD);
-	}
-	
-	public void readFooter( DataInput in ) throws IOException {
-	    long l = in.readLong();	    
-        if( isChecksumingEnabled() ) {
-            if( l!=checksum )            
-                throw new IOException("Invalid record footer: checksum does not match.");
-        } else {
-            checksum = l;            
-        }
-        
-        Location loc = Location.readFromDataInput(in);
-        if( !loc.equals(location) )
-            throw new IOException("Invalid record footer: location id does not match.");
-        
-        readAndCheckConstant(in, END_OF_RECORD, "Invalid record header: end of record constant missing.");
-	}
-	
-    /**
-     * @param randomAccessFile
-     * @throws IOException
-     */
-	public void checksum(DataInput in) throws IOException {
-		if( SELECTED_CHECKSUM_ALGORITHIM==HASH_CHECKSUM_ALGORITHIM ) {
-
-		    byte  buffer[] = new byte[1024];
-			byte rc[] = new byte[8];
-			for (int i = 0; i < payloadLength;) {
-			    int l = Math.min(buffer.length, payloadLength-i);
-				in.readFully(buffer,0,l);
-				for (int j = 0; j < l; j++) {
-					rc[j%8] ^= buffer[j];			
-				}
-				i+=l;
-			}			
-			checksum = (rc[0])|(rc[1]<<1)|(rc[2]<<2)|(rc[3]<<3)|(rc[4]<<4)|(rc[5]<<5)|(rc[6]<<6)|(rc[7]<<7) ;
-			
-		} else if( SELECTED_CHECKSUM_ALGORITHIM==CRC32_CHECKSUM_ALGORITHIM ) {
-			byte  buffer[] = new byte[1024];
-			CRC32 crc32 = new CRC32();
-			for (int i = 0; i < payloadLength;) {
-			    int l = Math.min(buffer.length, payloadLength-i);
-				in.readFully(buffer,0,l);
-				crc32.update(buffer,0,l);
-				i+=l;
-			}			
-			checksum = crc32.getValue();
-		} else {
-		    checksum = 0L;
-		}
-    }
-
-	
-    /**
-     */
-    private void readAndCheckConstant(DataInput in, byte[] byteConstant, String errorMessage ) throws IOException {
-        for (int i = 0; i < byteConstant.length; i++) {
-            byte checkByte = byteConstant[i];
-            if( in.readByte()!= checkByte ) {
-                throw new IOException(errorMessage);
-            }
-        }
-    }    
-    
-    public boolean readFromPacket(Packet packet) throws IOException {
-        Packet dup = packet.duplicate();
-
-        if( dup.remaining() < RECORD_HEADER_SIZE )
-            return false;
-        DataInputStream is = new DataInputStream(new PacketToInputStream(dup));
-        readHeader( is );
-        if( dup.remaining() < payloadLength+RECORD_FOOTER_SIZE ) {
-            return false;
-        }
-        
-        // Set limit to create a slice of the payload.
-        dup.limit(dup.position()+payloadLength);
-        this.payload = dup.slice();        
-	    if( isChecksumingEnabled() ) {
-	        checksum(new DataInputStream(new PacketToInputStream(payload)));
-	    }
-	    
-	    // restore the limit and seek to the footer.
-        dup.limit(packet.limit());
-        dup.position(dup.position()+payloadLength);
-        readFooter(is);
-        
-        // If every thing went well.. advance the position of the orignal packet.
-        packet.position(dup.position());
-        dup.dispose();
-        return true;        
-    }
-    
-    /**
-     * @return Returns the checksum.
-     */
-    public long getChecksum() {
-        return checksum;
-    }
-
-    /**
-     * @return Returns the length.
-     */
-    public int getPayloadLength() {
-        return payloadLength;
-    }
-
-    /**
-     * @return Returns the length of the record .
-     */
-    public int getRecordLength() {
-        return payloadLength+Record.RECORD_BASE_SIZE;
-    }
-
-    /**
-     * @return Returns the location.
-     */
-    public Location getLocation() {
-        return location;
-    }
-    
-    /**
-     * @return Returns the mark.
-     */
-    public Location getMark() {
-        return mark;
-    }
-
-    /**
-     * @return Returns the payload.
-     */
-    public Packet getPayload() {
-        return payload;
-    }
-
-    /**
-     * @return Returns the recordType.
-     */
-    public byte getRecordType() {
-        return recordType;
-    }
-
-	public boolean hasRemaining() {
-		return headerFooterPacket.position()!=RECORD_BASE_SIZE;
-	}
-
-	public void read(Packet packet) {
-		
-		// push the header
-		headerFooterPacket.read(packet);
-		// push the payload.
-		payload.read(packet);
-		
-		// Can we switch to the footer now?
-		if( !payload.hasRemaining() && headerFooterPacket.position()==RECORD_HEADER_SIZE ) {
-			headerFooterPacket.position(RECORD_HEADER_SIZE);
-             headerFooterPacket.limit(RECORD_BASE_SIZE);
-			headerFooterPacket.read(packet);			
-		}
-		
-	}
-
-    public void dispose() {
-        if( payload!=null ) {
-            payload.dispose();
-            payload=null;
-        }
-    }
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.active;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.zip.CRC32;
+
+import org.apache.activeio.adapter.PacketOutputStream;
+import org.apache.activeio.adapter.PacketToInputStream;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+
+
+/**
+ * Serializes/Deserializes data records. 
+ * 
+ * @version $Revision: 1.1 $
+ */
+final public class Record {
+    
+    static final public int RECORD_HEADER_SIZE=8+Location.SERIALIZED_SIZE;
+    static final public int RECORD_FOOTER_SIZE=12+Location.SERIALIZED_SIZE;
+	static final public int RECORD_BASE_SIZE=RECORD_HEADER_SIZE+RECORD_FOOTER_SIZE;
+	
+    static final public byte[] START_OF_RECORD 	= new byte[] { 'S', 'o', 'R' }; 
+    static final public byte[] END_OF_RECORD 	= new byte[] { 'E', 'o', 'R', '.' }; 
+        
+	static final public int SELECTED_CHECKSUM_ALGORITHIM;
+	static final public int NO_CHECKSUM_ALGORITHIM=0;
+	static final public int HASH_CHECKSUM_ALGORITHIM=1;
+	static final public int CRC32_CHECKSUM_ALGORITHIM=2;
+	
+	static {
+		String type = System.getProperty("org.apache.activeio.journal.active.SELECTED_CHECKSUM_ALGORITHIM", "none");
+		if( "none".equals(type) ) {
+			SELECTED_CHECKSUM_ALGORITHIM = NO_CHECKSUM_ALGORITHIM;			
+		} else if( "crc32".equals(type) ) {
+			SELECTED_CHECKSUM_ALGORITHIM = CRC32_CHECKSUM_ALGORITHIM;			
+		} else if( "hash".equals(type) ) {
+			SELECTED_CHECKSUM_ALGORITHIM = HASH_CHECKSUM_ALGORITHIM;			
+		} else {
+			System.err.println("System property 'org.apache.activeio.journal.active.SELECTED_CHECKSUM_ALGORITHIM' not set properly.  Valid values are: 'none', 'hash', or 'crc32'");
+			SELECTED_CHECKSUM_ALGORITHIM = NO_CHECKSUM_ALGORITHIM;			
+		}
+	}
+	
+	static public boolean isChecksumingEnabled() {
+		return SELECTED_CHECKSUM_ALGORITHIM!=NO_CHECKSUM_ALGORITHIM;
+	}
+		    
+    private final ByteArrayPacket headerFooterPacket = new ByteArrayPacket(new byte[RECORD_BASE_SIZE]);
+    private final DataOutputStream headerFooterData = new DataOutputStream(new PacketOutputStream(headerFooterPacket));
+
+    private int payloadLength;
+    private Location location;
+    private byte recordType;        
+    private long checksum;	
+    private Location mark;
+    private Packet payload;
+ 		
+    public Record() {        
+    }
+
+    public Record(byte recordType, Packet payload, Location mark) throws IOException {
+        this(null, recordType, payload, mark);
+    }
+    
+    public Record(Location location, byte recordType, Packet payload, Location mark) throws IOException {
+        this.location = location;
+        this.recordType = recordType;
+        this.mark = mark;
+        this.payload = payload.slice();
+        this.payloadLength = payload.remaining();
+        if( isChecksumingEnabled() ) {
+            checksum(new DataInputStream(new PacketToInputStream(this.payload)));
+        }
+
+        writeHeader(headerFooterData);
+        writeFooter(headerFooterData);
+    }    
+    
+    public void setLocation(Location location) throws IOException {
+        this.location = location;
+        headerFooterPacket.clear();
+        headerFooterPacket.position(8);
+        location.writeToDataOutput(headerFooterData);
+        headerFooterPacket.position(RECORD_HEADER_SIZE+8);
+        location.writeToDataOutput(headerFooterData);
+        payload.clear();
+        headerFooterPacket.position(0);
+        headerFooterPacket.limit(RECORD_HEADER_SIZE);
+    }
+    
+	private void writeHeader( DataOutput out ) throws IOException {
+	    out.write(START_OF_RECORD);
+	    out.writeByte(recordType);
+	    out.writeInt(payloadLength);
+        if( location!=null )
+            location.writeToDataOutput(out);
+        else
+            out.writeLong(0);
+	}	
+	
+	public void readHeader( DataInput in ) throws IOException {
+        readAndCheckConstant(in, START_OF_RECORD, "Invalid record header: start of record constant missing.");
+        recordType = in.readByte();
+        payloadLength = in.readInt();
+        if( payloadLength < 0 )
+            throw new IOException("Invalid record header: record length cannot be less than zero.");
+        location = Location.readFromDataInput(in);
+	}
+	
+	private void writeFooter( DataOutput out ) throws IOException {
+	    out.writeLong(checksum);
+        if( location!=null )
+            location.writeToDataOutput(out);
+        else
+            out.writeLong(0);
+	    out.write(END_OF_RECORD);
+	}
+	
+	public void readFooter( DataInput in ) throws IOException {
+	    long l = in.readLong();	    
+        if( isChecksumingEnabled() ) {
+            if( l!=checksum )            
+                throw new IOException("Invalid record footer: checksum does not match.");
+        } else {
+            checksum = l;            
+        }
+        
+        Location loc = Location.readFromDataInput(in);
+        if( !loc.equals(location) )
+            throw new IOException("Invalid record footer: location id does not match.");
+        
+        readAndCheckConstant(in, END_OF_RECORD, "Invalid record header: end of record constant missing.");
+	}
+	
+    /**
+     * @param randomAccessFile
+     * @throws IOException
+     */
+	public void checksum(DataInput in) throws IOException {
+		if( SELECTED_CHECKSUM_ALGORITHIM==HASH_CHECKSUM_ALGORITHIM ) {
+
+		    byte  buffer[] = new byte[1024];
+			byte rc[] = new byte[8];
+			for (int i = 0; i < payloadLength;) {
+			    int l = Math.min(buffer.length, payloadLength-i);
+				in.readFully(buffer,0,l);
+				for (int j = 0; j < l; j++) {
+					rc[j%8] ^= buffer[j];			
+				}
+				i+=l;
+			}			
+			checksum = (rc[0])|(rc[1]<<1)|(rc[2]<<2)|(rc[3]<<3)|(rc[4]<<4)|(rc[5]<<5)|(rc[6]<<6)|(rc[7]<<7) ;
+			
+		} else if( SELECTED_CHECKSUM_ALGORITHIM==CRC32_CHECKSUM_ALGORITHIM ) {
+			byte  buffer[] = new byte[1024];
+			CRC32 crc32 = new CRC32();
+			for (int i = 0; i < payloadLength;) {
+			    int l = Math.min(buffer.length, payloadLength-i);
+				in.readFully(buffer,0,l);
+				crc32.update(buffer,0,l);
+				i+=l;
+			}			
+			checksum = crc32.getValue();
+		} else {
+		    checksum = 0L;
+		}
+    }
+
+	
+    /**
+     */
+    private void readAndCheckConstant(DataInput in, byte[] byteConstant, String errorMessage ) throws IOException {
+        for (int i = 0; i < byteConstant.length; i++) {
+            byte checkByte = byteConstant[i];
+            if( in.readByte()!= checkByte ) {
+                throw new IOException(errorMessage);
+            }
+        }
+    }    
+    
+    public boolean readFromPacket(Packet packet) throws IOException {
+        Packet dup = packet.duplicate();
+
+        if( dup.remaining() < RECORD_HEADER_SIZE )
+            return false;
+        DataInputStream is = new DataInputStream(new PacketToInputStream(dup));
+        readHeader( is );
+        if( dup.remaining() < payloadLength+RECORD_FOOTER_SIZE ) {
+            return false;
+        }
+        
+        // Set limit to create a slice of the payload.
+        dup.limit(dup.position()+payloadLength);
+        this.payload = dup.slice();        
+	    if( isChecksumingEnabled() ) {
+	        checksum(new DataInputStream(new PacketToInputStream(payload)));
+	    }
+	    
+	    // restore the limit and seek to the footer.
+        dup.limit(packet.limit());
+        dup.position(dup.position()+payloadLength);
+        readFooter(is);
+        
+        // If every thing went well.. advance the position of the orignal packet.
+        packet.position(dup.position());
+        dup.dispose();
+        return true;        
+    }
+    
+    /**
+     * @return Returns the checksum.
+     */
+    public long getChecksum() {
+        return checksum;
+    }
+
+    /**
+     * @return Returns the length.
+     */
+    public int getPayloadLength() {
+        return payloadLength;
+    }
+
+    /**
+     * @return Returns the length of the record .
+     */
+    public int getRecordLength() {
+        return payloadLength+Record.RECORD_BASE_SIZE;
+    }
+
+    /**
+     * @return Returns the location.
+     */
+    public Location getLocation() {
+        return location;
+    }
+    
+    /**
+     * @return Returns the mark.
+     */
+    public Location getMark() {
+        return mark;
+    }
+
+    /**
+     * @return Returns the payload.
+     */
+    public Packet getPayload() {
+        return payload;
+    }
+
+    /**
+     * @return Returns the recordType.
+     */
+    public byte getRecordType() {
+        return recordType;
+    }
+
+	public boolean hasRemaining() {
+		return headerFooterPacket.position()!=RECORD_BASE_SIZE;
+	}
+
+	public void read(Packet packet) {
+		
+		// push the header
+		headerFooterPacket.read(packet);
+		// push the payload.
+		payload.read(packet);
+		
+		// Can we switch to the footer now?
+		if( !payload.hasRemaining() && headerFooterPacket.position()==RECORD_HEADER_SIZE ) {
+			headerFooterPacket.position(RECORD_HEADER_SIZE);
+             headerFooterPacket.limit(RECORD_BASE_SIZE);
+			headerFooterPacket.read(packet);			
+		}
+		
+	}
+
+    public void dispose() {
+        if( payload!=null ) {
+            payload.dispose();
+            payload=null;
+        }
+    }
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Record.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/RecordInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/RecordInfo.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/RecordInfo.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/RecordInfo.java Tue Feb 21 15:12:56 2006
@@ -1,59 +1,59 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.active;
-
-/**
- * @version $Revision: 1.1 $
- */
-final public class RecordInfo {
-
-    private final Location location;
-    private final Record header;
-    private final LogFileNode logFileState;
-    private final LogFile logFile;
-
-    public RecordInfo(Location location, Record header, LogFileNode logFileState, LogFile logFile) {
-        this.location = location;
-        this.header = header;
-        this.logFileState = logFileState;
-        this.logFile = logFile;
-    }
-
-    int getNextLocation() {
-        return location.getLogFileOffset() + header.getPayloadLength() + Record.RECORD_BASE_SIZE;
-    }
-
-    public Record getHeader() {
-        return header;
-    }
-
-    public Location getLocation() {
-        return location;
-    }
-
-    public LogFileNode getLogFileState() {
-        return logFileState;
-    }
-
-    public LogFile getLogFile() {
-        return logFile;
-    }
-
-    public int getDataOffset() {
-        return location.getLogFileOffset() + Record.RECORD_HEADER_SIZE;
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.active;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+final public class RecordInfo {
+
+    private final Location location;
+    private final Record header;
+    private final LogFileNode logFileState;
+    private final LogFile logFile;
+
+    public RecordInfo(Location location, Record header, LogFileNode logFileState, LogFile logFile) {
+        this.location = location;
+        this.header = header;
+        this.logFileState = logFileState;
+        this.logFile = logFile;
+    }
+
+    int getNextLocation() {
+        return location.getLogFileOffset() + header.getPayloadLength() + Record.RECORD_BASE_SIZE;
+    }
+
+    public Record getHeader() {
+        return header;
+    }
+
+    public Location getLocation() {
+        return location;
+    }
+
+    public LogFileNode getLogFileState() {
+        return logFileState;
+    }
+
+    public LogFile getLogFile() {
+        return logFile;
+    }
+
+    public int getDataOffset() {
+        return location.getLogFileOffset() + Record.RECORD_HEADER_SIZE;
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/RecordInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/HowlJournal.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/HowlJournal.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/HowlJournal.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/HowlJournal.java Tue Feb 21 15:12:56 2006
@@ -1,201 +1,201 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.howl;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-
-import org.apache.activeio.journal.InvalidRecordLocationException;
-import org.apache.activeio.journal.Journal;
-import org.apache.activeio.journal.JournalEventListener;
-import org.apache.activeio.journal.RecordLocation;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.Packet;
-import org.objectweb.howl.log.Configuration;
-import org.objectweb.howl.log.InvalidFileSetException;
-import org.objectweb.howl.log.InvalidLogBufferException;
-import org.objectweb.howl.log.InvalidLogKeyException;
-import org.objectweb.howl.log.LogConfigurationException;
-import org.objectweb.howl.log.LogEventListener;
-import org.objectweb.howl.log.LogRecord;
-import org.objectweb.howl.log.Logger;
-
-/**
- * An implementation of the Journal interface using a HOWL logger.  This is is a thin
- * wrapper around a HOWL logger.
- * 
- * This implementation can be used to write records but not to retreive them
- * yet. Once the HOWL logger implements the methods needed to retreive
- * previously stored records, this class can be completed.
- * 
- * @version $Revision: 1.2 $
- */
-public class HowlJournal implements Journal {
-
-	private final Logger logger;
-
-	private RecordLocation lastMark;
-
-	public HowlJournal(Configuration configuration)
-			throws InvalidFileSetException, LogConfigurationException,
-			InvalidLogBufferException, ClassNotFoundException, IOException,
-			InterruptedException {
-		this.logger = new Logger(configuration);
-		this.logger.open();
-		lastMark = new LongRecordLocation(logger.getActiveMark());
-	}
-
-	/**
-	 * @see org.apache.activeio.journal.Journal#write(byte[], boolean)
-	 */
-	public RecordLocation write(Packet packet, boolean sync) throws IOException {
-		try {
-			return new LongRecordLocation(logger.put(packet.sliceAsBytes(), sync));
-		} catch (InterruptedException e) {
-			throw (InterruptedIOException) new InterruptedIOException()
-					.initCause(e);
-		} catch (IOException e) {
-			throw e;
-		} catch (Exception e) {
-			throw (IOException) new IOException("Journal write failed: " + e)
-					.initCause(e);
-		}
-	}
-
-	/**
-	 * @see org.apache.activeio.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation, boolean)
-	 */
-	public void setMark(RecordLocation recordLocator, boolean force)
-			throws InvalidRecordLocationException, IOException {
-		try {
-			long location = toLong(recordLocator);
-			logger.mark(location, force);
-			lastMark = recordLocator;
-
-		} catch (InterruptedException e) {
-			throw (InterruptedIOException) new InterruptedIOException()
-					.initCause(e);
-		} catch (IOException e) {
-			throw e;
-		} catch (InvalidLogKeyException e) {
-			throw new InvalidRecordLocationException(e.getMessage(), e);
-		} catch (Exception e) {
-			throw (IOException) new IOException("Journal write failed: " + e)
-					.initCause(e);
-		}
-	}
-	
-	/**
-     * @param recordLocator
-     * @return
-     * @throws InvalidRecordLocationException
-     */
-    private long toLong(RecordLocation recordLocator) throws InvalidRecordLocationException {
-        if (recordLocator == null
-        		|| recordLocator.getClass() != LongRecordLocation.class)
-        	throw new InvalidRecordLocationException();
-
-        long location = ((LongRecordLocation) recordLocator)
-        		.getLongLocation();
-        return location;
-    }
-
-    /**
-	 * @see org.apache.activeio.journal.Journal#getMark()
-	 */
-	public RecordLocation getMark() {
-		return lastMark;
-	}
-
-	/**
-	 * @see org.apache.activeio.journal.Journal#close()
-	 */
-	public void close() throws IOException {
-		try {
-			logger.close();
-		} catch (IOException e) {
-			throw e;
-		} catch (InterruptedException e) {
-			throw (InterruptedIOException) new InterruptedIOException()
-					.initCause(e);
-		} catch (Exception e) {
-			throw (IOException) new IOException("Journal close failed: " + e)
-					.initCause(e);
-		}
-	}
-
-	/**
-	 * @see org.apache.activeio.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener)
-	 */
-	public void setJournalEventListener(final JournalEventListener eventListener) {
-		logger.setLogEventListener(new LogEventListener() {
-			public void logOverflowNotification(long key) {
-				eventListener.overflowNotification(new LongRecordLocation(key));
-			}
-		});
-	}
-
-	/**
-	 * @see org.apache.activeio.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
-	 */
-	public RecordLocation getNextRecordLocation(RecordLocation lastLocation)
-			throws InvalidRecordLocationException {
-	    
-	    if( lastLocation ==null ) {
-	        if( this.lastMark !=null ) {
-	            lastLocation = lastMark;
-	        } else {
-	            return null;
-	        }
-	    }
-	    
-	    try {
-	        while(true) {
-	            LogRecord record = logger.get(null, toLong(lastLocation));
-		        // I assume getNext will return null if there is no next record. 
-	            LogRecord next = logger.getNext(record);
-	            if( next==null || next.length == 0 )
-	                return null;
-	            lastLocation = new LongRecordLocation(next.key);
-	            if( !next.isCTRL() )
-	                return lastLocation;
-	        }
-		} catch (Exception e) {
-			throw (InvalidRecordLocationException)new InvalidRecordLocationException().initCause(e);
-        }
-		
-	}
-
-	/**
-	 * @see org.apache.activeio.journal.Journal#read(org.codehaus.activemq.journal.RecordLocation)
-	 */
-	public Packet read(RecordLocation location)
-			throws InvalidRecordLocationException, IOException {
-	    
-	    try {
-            LogRecord record = logger.get(null, toLong(location));
-            return new ByteArrayPacket(record.data);            
-		} catch (InvalidLogKeyException e) {
-			throw new InvalidRecordLocationException(e.getMessage(), e);
-		} catch (Exception e) {
-			throw (IOException) new IOException("Journal write failed: " + e)
-					.initCause(e);
-		}
-		
-	}
-
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.howl;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import org.apache.activeio.journal.InvalidRecordLocationException;
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalEventListener;
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.objectweb.howl.log.Configuration;
+import org.objectweb.howl.log.InvalidFileSetException;
+import org.objectweb.howl.log.InvalidLogBufferException;
+import org.objectweb.howl.log.InvalidLogKeyException;
+import org.objectweb.howl.log.LogConfigurationException;
+import org.objectweb.howl.log.LogEventListener;
+import org.objectweb.howl.log.LogRecord;
+import org.objectweb.howl.log.Logger;
+
+/**
+ * An implementation of the Journal interface using a HOWL logger.  This is is a thin
+ * wrapper around a HOWL logger.
+ * 
+ * This implementation can be used to write records but not to retreive them
+ * yet. Once the HOWL logger implements the methods needed to retreive
+ * previously stored records, this class can be completed.
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class HowlJournal implements Journal {
+
+	private final Logger logger;
+
+	private RecordLocation lastMark;
+
+	public HowlJournal(Configuration configuration)
+			throws InvalidFileSetException, LogConfigurationException,
+			InvalidLogBufferException, ClassNotFoundException, IOException,
+			InterruptedException {
+		this.logger = new Logger(configuration);
+		this.logger.open();
+		lastMark = new LongRecordLocation(logger.getActiveMark());
+	}
+
+	/**
+	 * @see org.apache.activeio.journal.Journal#write(byte[], boolean)
+	 */
+	public RecordLocation write(Packet packet, boolean sync) throws IOException {
+		try {
+			return new LongRecordLocation(logger.put(packet.sliceAsBytes(), sync));
+		} catch (InterruptedException e) {
+			throw (InterruptedIOException) new InterruptedIOException()
+					.initCause(e);
+		} catch (IOException e) {
+			throw e;
+		} catch (Exception e) {
+			throw (IOException) new IOException("Journal write failed: " + e)
+					.initCause(e);
+		}
+	}
+
+	/**
+	 * @see org.apache.activeio.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation, boolean)
+	 */
+	public void setMark(RecordLocation recordLocator, boolean force)
+			throws InvalidRecordLocationException, IOException {
+		try {
+			long location = toLong(recordLocator);
+			logger.mark(location, force);
+			lastMark = recordLocator;
+
+		} catch (InterruptedException e) {
+			throw (InterruptedIOException) new InterruptedIOException()
+					.initCause(e);
+		} catch (IOException e) {
+			throw e;
+		} catch (InvalidLogKeyException e) {
+			throw new InvalidRecordLocationException(e.getMessage(), e);
+		} catch (Exception e) {
+			throw (IOException) new IOException("Journal write failed: " + e)
+					.initCause(e);
+		}
+	}
+	
+	/**
+     * @param recordLocator
+     * @return
+     * @throws InvalidRecordLocationException
+     */
+    private long toLong(RecordLocation recordLocator) throws InvalidRecordLocationException {
+        if (recordLocator == null
+        		|| recordLocator.getClass() != LongRecordLocation.class)
+        	throw new InvalidRecordLocationException();
+
+        long location = ((LongRecordLocation) recordLocator)
+        		.getLongLocation();
+        return location;
+    }
+
+    /**
+	 * @see org.apache.activeio.journal.Journal#getMark()
+	 */
+	public RecordLocation getMark() {
+		return lastMark;
+	}
+
+	/**
+	 * @see org.apache.activeio.journal.Journal#close()
+	 */
+	public void close() throws IOException {
+		try {
+			logger.close();
+		} catch (IOException e) {
+			throw e;
+		} catch (InterruptedException e) {
+			throw (InterruptedIOException) new InterruptedIOException()
+					.initCause(e);
+		} catch (Exception e) {
+			throw (IOException) new IOException("Journal close failed: " + e)
+					.initCause(e);
+		}
+	}
+
+	/**
+	 * @see org.apache.activeio.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener)
+	 */
+	public void setJournalEventListener(final JournalEventListener eventListener) {
+		logger.setLogEventListener(new LogEventListener() {
+			public void logOverflowNotification(long key) {
+				eventListener.overflowNotification(new LongRecordLocation(key));
+			}
+		});
+	}
+
+	/**
+	 * @see org.apache.activeio.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
+	 */
+	public RecordLocation getNextRecordLocation(RecordLocation lastLocation)
+			throws InvalidRecordLocationException {
+	    
+	    if( lastLocation ==null ) {
+	        if( this.lastMark !=null ) {
+	            lastLocation = lastMark;
+	        } else {
+	            return null;
+	        }
+	    }
+	    
+	    try {
+	        while(true) {
+	            LogRecord record = logger.get(null, toLong(lastLocation));
+		        // I assume getNext will return null if there is no next record. 
+	            LogRecord next = logger.getNext(record);
+	            if( next==null || next.length == 0 )
+	                return null;
+	            lastLocation = new LongRecordLocation(next.key);
+	            if( !next.isCTRL() )
+	                return lastLocation;
+	        }
+		} catch (Exception e) {
+			throw (InvalidRecordLocationException)new InvalidRecordLocationException().initCause(e);
+        }
+		
+	}
+
+	/**
+	 * @see org.apache.activeio.journal.Journal#read(org.codehaus.activemq.journal.RecordLocation)
+	 */
+	public Packet read(RecordLocation location)
+			throws InvalidRecordLocationException, IOException {
+	    
+	    try {
+            LogRecord record = logger.get(null, toLong(location));
+            return new ByteArrayPacket(record.data);            
+		} catch (InvalidLogKeyException e) {
+			throw new InvalidRecordLocationException(e.getMessage(), e);
+		} catch (Exception e) {
+			throw (IOException) new IOException("Journal write failed: " + e)
+					.initCause(e);
+		}
+		
+	}
+
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/howl/HowlJournal.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message