activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r379619 [5/30] - in /incubator/activemq/trunk: ./ activecluster/ activecluster/src/java/org/apache/activecluster/ activecluster/src/java/org/apache/activecluster/election/ activecluster/src/java/org/apache/activecluster/election/impl/ activ...
Date Tue, 21 Feb 2006 23:14:17 GMT
Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/ControlFile.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/ControlFile.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/ControlFile.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/ControlFile.java Tue Feb 21 15:12:56 2006
@@ -1,190 +1,190 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.active;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.activeio.packet.ByteBufferPacket;
-import org.apache.activeio.packet.Packet;
-
-/**
- * Control file holds the last known good state of the journal.  It stores the state in 
- * record that is versioned and repeated twice in the file so that a failure in the
- * middle of the write of the first or second record do not not result in an unknown
- * state. 
- * 
- * @version $Revision: 1.1 $
- */
-final public class ControlFile {
-
-    /** The File that holds the control data. */
-    private final RandomAccessFile file;
-    private final FileChannel channel;
-    private final ByteBufferPacket controlData;
-
-    private long controlDataVersion=0;
-    private FileLock lock;
-	private boolean disposed;
-    private static Set lockSet;
-    private String canonicalPath;
-
-    public ControlFile(File fileName, int controlDataSize) throws IOException {
-        canonicalPath = fileName.getCanonicalPath();
-        boolean existed = fileName.exists();        
-        file = new RandomAccessFile(fileName, "rw");
-        channel = file.getChannel();
-        controlData = new ByteBufferPacket(ByteBuffer.allocateDirect(controlDataSize));
-
-    }
-
-    /**
-     * Locks the control file.
-     * @throws IOException 
-     */
-    public void lock() throws IOException {
-        Set set = getVmLockSet();
-        synchronized (set) {
-            if (lock == null) {
-                if (!set.add(canonicalPath)) {
-                    throw new IOException("Journal is already opened by this application.");
-                }
-
-                lock = channel.tryLock();
-                if (lock == null) {
-                    set.remove(canonicalPath);
-                    throw new IOException("Journal is already opened by another application");
-                }
-            }
-        }
-    }
-
-    /**
-     * Un locks the control file.
-     * 
-     * @throws IOException
-     */
-    public void unlock() throws IOException {
-        Set set = getVmLockSet();
-        synchronized (set) {
-            if (lock != null) {
-                set.remove(canonicalPath);
-                lock.release();
-                lock = null;
-            }
-        }
-    }
-    
-    static private Set getVmLockSet() {
-        if ( lockSet == null ) { 
-            Properties properties = System.getProperties();
-            synchronized(properties) {
-                lockSet = (Set) properties.get("org.apache.activeio.journal.active.lockMap");
-                if( lockSet == null ) {
-                    lockSet = new HashSet();
-                }
-                properties.put("org.apache.activeio.journal.active.lockMap", lockSet);
-            }
-        }
-        return lockSet;
-    }
-
-    
-    public boolean load() throws IOException {
-        long l = file.length();
-        if( l < controlData.capacity() ) {
-            controlDataVersion=0;
-            controlData.position(0);
-            controlData.limit(0);
-            return false;
-        } else {            
-            file.seek(0);
-            long v1 = file.readLong();
-            file.seek(controlData.capacity()+8);
-            long v1check = file.readLong();
-            
-            file.seek(controlData.capacity()+16);
-            long v2 = file.readLong();
-            file.seek((controlData.capacity()*2)+24);
-            long v2check = file.readLong();
-            
-            if( v2 == v2check ) {
-                controlDataVersion = v2;
-                file.seek(controlData.capacity()+24);
-                controlData.clear();
-                channel.read(controlData.getByteBuffer());
-            } else if ( v1 == v1check ){
-                controlDataVersion = v1;
-                file.seek(controlData.capacity()+8);
-                controlData.clear();
-                channel.read(controlData.getByteBuffer());
-            } else {
-                // Bummer.. Both checks are screwed. we don't know
-                // if any of the two buffer are ok.  This should
-                // only happen is data got corrupted.
-                throw new IOException("Control data corrupted.");
-            }         
-            return true;
-        }
-    }
-    
-    public void store() throws IOException {
-        controlDataVersion++;
-        file.setLength((controlData.capacity()*2)+32);
-        file.seek(0);
-        
-        // Write the first copy of the control data.
-        file.writeLong(controlDataVersion);
-        controlData.clear();
-        channel.write(controlData.getByteBuffer());
-        file.writeLong(controlDataVersion);
-
-        // Write the second copy of the control data.
-        file.writeLong(controlDataVersion);
-        controlData.clear();
-        channel.write(controlData.getByteBuffer());
-        file.writeLong(controlDataVersion);
-        
-        channel.force(false);        
-    }
-
-    public Packet getControlData() {
-        controlData.clear();
-        return controlData;
-    }
-
-    public void dispose() {
-    	if( disposed )
-    		return;
-    	disposed=true;
-        try {
-            unlock();
-        } catch (IOException e) {
-        }
-        try {
-            file.close();
-        } catch (IOException e) {
-        }
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.active;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.Packet;
+
+/**
+ * Control file holds the last known good state of the journal.  It stores the state in 
+ * record that is versioned and repeated twice in the file so that a failure in the
+ * middle of the write of the first or second record do not not result in an unknown
+ * state. 
+ * 
+ * @version $Revision: 1.1 $
+ */
+final public class ControlFile {
+
+    /** The File that holds the control data. */
+    private final RandomAccessFile file;
+    private final FileChannel channel;
+    private final ByteBufferPacket controlData;
+
+    private long controlDataVersion=0;
+    private FileLock lock;
+	private boolean disposed;
+    private static Set lockSet;
+    private String canonicalPath;
+
+    public ControlFile(File fileName, int controlDataSize) throws IOException {
+        canonicalPath = fileName.getCanonicalPath();
+        boolean existed = fileName.exists();        
+        file = new RandomAccessFile(fileName, "rw");
+        channel = file.getChannel();
+        controlData = new ByteBufferPacket(ByteBuffer.allocateDirect(controlDataSize));
+
+    }
+
+    /**
+     * Locks the control file.
+     * @throws IOException 
+     */
+    public void lock() throws IOException {
+        Set set = getVmLockSet();
+        synchronized (set) {
+            if (lock == null) {
+                if (!set.add(canonicalPath)) {
+                    throw new IOException("Journal is already opened by this application.");
+                }
+
+                lock = channel.tryLock();
+                if (lock == null) {
+                    set.remove(canonicalPath);
+                    throw new IOException("Journal is already opened by another application");
+                }
+            }
+        }
+    }
+
+    /**
+     * Un locks the control file.
+     * 
+     * @throws IOException
+     */
+    public void unlock() throws IOException {
+        Set set = getVmLockSet();
+        synchronized (set) {
+            if (lock != null) {
+                set.remove(canonicalPath);
+                lock.release();
+                lock = null;
+            }
+        }
+    }
+    
+    static private Set getVmLockSet() {
+        if ( lockSet == null ) { 
+            Properties properties = System.getProperties();
+            synchronized(properties) {
+                lockSet = (Set) properties.get("org.apache.activeio.journal.active.lockMap");
+                if( lockSet == null ) {
+                    lockSet = new HashSet();
+                }
+                properties.put("org.apache.activeio.journal.active.lockMap", lockSet);
+            }
+        }
+        return lockSet;
+    }
+
+    
+    public boolean load() throws IOException {
+        long l = file.length();
+        if( l < controlData.capacity() ) {
+            controlDataVersion=0;
+            controlData.position(0);
+            controlData.limit(0);
+            return false;
+        } else {            
+            file.seek(0);
+            long v1 = file.readLong();
+            file.seek(controlData.capacity()+8);
+            long v1check = file.readLong();
+            
+            file.seek(controlData.capacity()+16);
+            long v2 = file.readLong();
+            file.seek((controlData.capacity()*2)+24);
+            long v2check = file.readLong();
+            
+            if( v2 == v2check ) {
+                controlDataVersion = v2;
+                file.seek(controlData.capacity()+24);
+                controlData.clear();
+                channel.read(controlData.getByteBuffer());
+            } else if ( v1 == v1check ){
+                controlDataVersion = v1;
+                file.seek(controlData.capacity()+8);
+                controlData.clear();
+                channel.read(controlData.getByteBuffer());
+            } else {
+                // Bummer.. Both checks are screwed. we don't know
+                // if any of the two buffer are ok.  This should
+                // only happen is data got corrupted.
+                throw new IOException("Control data corrupted.");
+            }         
+            return true;
+        }
+    }
+    
+    public void store() throws IOException {
+        controlDataVersion++;
+        file.setLength((controlData.capacity()*2)+32);
+        file.seek(0);
+        
+        // Write the first copy of the control data.
+        file.writeLong(controlDataVersion);
+        controlData.clear();
+        channel.write(controlData.getByteBuffer());
+        file.writeLong(controlDataVersion);
+
+        // Write the second copy of the control data.
+        file.writeLong(controlDataVersion);
+        controlData.clear();
+        channel.write(controlData.getByteBuffer());
+        file.writeLong(controlDataVersion);
+        
+        channel.force(false);        
+    }
+
+    public Packet getControlData() {
+        controlData.clear();
+        return controlData;
+    }
+
+    public void dispose() {
+    	if( disposed )
+    		return;
+    	disposed=true;
+        try {
+            unlock();
+        } catch (IOException e) {
+        }
+        try {
+            file.close();
+        } catch (IOException e) {
+        }
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/ControlFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/JournalImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/JournalImpl.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/JournalImpl.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/JournalImpl.java Tue Feb 21 15:12:56 2006
@@ -1,461 +1,461 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.active;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-
-import org.apache.activeio.journal.InvalidRecordLocationException;
-import org.apache.activeio.journal.Journal;
-import org.apache.activeio.journal.JournalEventListener;
-import org.apache.activeio.journal.RecordLocation;
-import org.apache.activeio.packet.ByteArrayPacket;
-import org.apache.activeio.packet.ByteBufferPacketPool;
-import org.apache.activeio.packet.Packet;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Callable;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
-import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
-import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
-import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-
-/**
- * A high speed Journal implementation. Inspired by the ideas of the <a
- * href="http://howl.objectweb.org/">Howl </a> project but tailored to the needs
- * of ActiveMQ. <p/>This Journal provides the following features:
- * <ul>
- * <li>Concurrent writes are batched into a single write/force done by a
- * background thread.</li>
- * <li>Uses preallocated logs to avoid disk fragmentation and performance
- * degregation.</li>
- * <li>The number and size of the preallocated logs are configurable.</li>
- * <li>Uses direct ByteBuffers to write data to log files.</li>
- * <li>Allows logs to grow in case of an overflow condition so that overflow
- * exceptions are not not thrown. Grown logs that are inactivate (due to a new
- * mark) are resized to their original size.</li>
- * <li>No limit on the size of the record written to the journal</li>
- * <li>Should be possible to extend so that multiple physical disk are used
- * concurrently to increase throughput and decrease latency.</li>
- * </ul>
- * <p/>
- * 
- * @version $Revision: 1.1 $
- */
-final public class JournalImpl implements Journal {
-
-    public static final int DEFAULT_POOL_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.journal.active.DefaultPoolSize", ""+(5)));
-    public static final int DEFAULT_PACKET_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.journal.active.DefaultPacketSize", ""+(1024*1024*4)));
-
-    static final private int OVERFLOW_RENOTIFICATION_DELAY = 500;
-    
-    static private ByteBufferPacketPool lastPool;
-    
-    private boolean disposed = false;
-
-    // The id of the current log file that is being filled.
-    private int appendLogFileId = 0;
-
-    // The offset in the current log file that is being filled.
-    private int appendLogFileOffset = 0;
-
-    // Used to batch writes together.
-    private BatchedWrite pendingBatchWrite;
-    
-    private Location lastMarkedLocation;
-    private LogFileManager file;
-    private ThreadPoolExecutor executor;
-    private int rolloverFence;
-    private JournalEventListener eventListener;
-    private ByteBufferPacketPool packetPool;
-    private long overflowNotificationTime = System.currentTimeMillis();
-    private Packet markPacket = new ByteArrayPacket(new byte[Location.SERIALIZED_SIZE]);
-
-    public JournalImpl(File logDirectory) throws IOException {
-        this(new LogFileManager(logDirectory));
-    }
-
-    public JournalImpl(File logDirectory, int logFileCount, int logFileSize) throws IOException {
-        this(new LogFileManager(logDirectory, logFileCount, logFileSize, null));
-    }
-    
-    public JournalImpl(File logDirectory, int logFileCount, int logFileSize, File archiveDirectory) throws IOException {
-        this(new LogFileManager(logDirectory, logFileCount, logFileSize, archiveDirectory));
-    }
-
-    public JournalImpl(LogFileManager logFile) {
-        this.file = logFile;
-        this.packetPool = createBufferPool();
-        this.executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread answer = new Thread(runnable, "Journal Writer");
-                answer.setPriority(Thread.MAX_PRIORITY);
-                answer.setDaemon(true);
-                return answer;
-            }
-        });
-        executor.allowCoreThreadTimeOut(true);
-        
-        lastMarkedLocation = file.getLastMarkedRecordLocation();
-        Location nextAppendLocation = file.getNextAppendLocation();
-        appendLogFileId = nextAppendLocation.getLogFileId();
-        appendLogFileOffset = nextAppendLocation.getLogFileOffset();
-        
-        rolloverFence = (file.getInitialLogFileSize() / 10) * 9;
-    }
-
-    
-    /**
-     * When running unit tests we may not be able to create new pools fast enough
-     * since the old pools are not being gc'ed fast enough.  So we pool the pool.
-     * @return
-     */
-    synchronized static private ByteBufferPacketPool createBufferPool() {
-        if( lastPool !=null ) {
-            ByteBufferPacketPool rc = lastPool;
-            lastPool = null;
-            return rc;
-        } else { 
-            return new ByteBufferPacketPool(DEFAULT_POOL_SIZE, DEFAULT_PACKET_SIZE);
-        }
-    }
-    
-    /**
-     * When running unit tests we may not be able to create new pools fast enough
-     * since the old pools are not being gc'ed fast enough.  So we pool the pool.
-     * @return
-     */
-    synchronized static private void disposeBufferPool(ByteBufferPacketPool pool) {
-        if( lastPool!=null ) {
-            pool.dispose();
-        } else {
-            pool.waitForPacketsToReturn();
-            lastPool = pool;
-        }
-    }
-
-
-
-    public RecordLocation write(Packet data, boolean sync) throws IOException {
-        return write(LogFileManager.DATA_RECORD_TYPE, data, sync, null);
-    }
-
-    private Location write(byte recordType, Packet data, boolean sync, Location mark) throws IOException {
-        try {
-            Location location;
-            BatchedWrite writeCommand;
-            
-            Record record = new Record(recordType, data, mark);
-            
-            // The following synchronized block is the bottle neck of the journal.  Make this
-            // code faster and the journal should speed up.
-            synchronized (this) {
-                if (disposed) {
-                    throw new IOException("Journal has been closed.");
-                }
-
-                // Create our record
-                location = new Location(appendLogFileId, appendLogFileOffset);
-                record.setLocation(location);
-                
-                // Piggy back the packet on the pending write batch.
-                writeCommand = addToPendingWriteBatch(record, mark, sync);
-
-                // Update where the next record will land.
-                appendLogFileOffset += data.limit() + Record.RECORD_BASE_SIZE;
-                rolloverCheck();
-            }
-
-            if (sync) {
-                writeCommand.waitForForce();
-            }
-
-            return location;
-        } catch (IOException e) {
-            throw e;
-        } catch (InterruptedException e) {
-            throw (IOException) new InterruptedIOException().initCause(e);
-        } catch (Throwable e) {
-            throw (IOException) new IOException("Write failed: " + e).initCause(e);
-        }
-    }
-
-    /**
-     * @param record
-     * @return
-     * @throws InterruptedException
-     */
-    private BatchedWrite addToPendingWriteBatch(Record record, Location mark, boolean force) throws InterruptedException {
-
-        // Load the write batch up with data from our record.
-        // it may take more than one write batch if the record is large.
-        BatchedWrite answer = null;
-        while (record.hasRemaining()) {
-            
-            // Do we need another BatchWrite?
-            boolean queueTheWrite=false;
-            if (pendingBatchWrite == null) {
-                pendingBatchWrite =  new BatchedWrite(packetPool.getPacket());
-                queueTheWrite = true;
-            }
-            answer = pendingBatchWrite;
-
-            // Can we continue to use the pendingBatchWrite?
-            boolean full = !pendingBatchWrite.append(record, mark, force);
-            
-            if( queueTheWrite ) {
-                final BatchedWrite queuedWrite = pendingBatchWrite;
-                executor.execute(new Runnable() {
-                    public void run() {
-                        try {
-                            queuedWrite(queuedWrite);
-                        } catch (InterruptedException e) {
-                        }
-                    }
-                });
-            }
-            
-            if( full )
-                pendingBatchWrite = null;            
-        }
-        return answer;
-
-    }
-
-    /**
-     * This is a blocking call
-     * 
-     * @param write
-     * @throws InterruptedException
-     */
-    private void queuedWrite(BatchedWrite write) throws InterruptedException {
-
-        // Stop other threads from appending more pendingBatchWrite.
-        write.flip();
-
-        // Do the write.
-        try {
-            file.append(write);
-            write.forced();
-        } catch (Throwable e) {
-            write.writeFailed(e);
-        } finally {
-            write.getPacket().dispose();
-        }
-    }
-
-    /**
-     * 
-     */
-    private void rolloverCheck() throws IOException {
-
-        // See if we need to issue an overflow notification.
-        if (eventListener != null && file.isPastHalfActive()
-                && overflowNotificationTime + OVERFLOW_RENOTIFICATION_DELAY < System.currentTimeMillis()) {
-
-            // We need to send an overflow notification to free up
-            // some logFiles.
-            Location safeSpot = file.getFirstRecordLocationOfSecondActiveLogFile();
-            eventListener.overflowNotification(safeSpot);
-            overflowNotificationTime = System.currentTimeMillis();
-        }
-
-        // Is it time to roll over?
-        if (appendLogFileOffset > rolloverFence ) {
-
-            // Can we roll over?
-            if ( !file.canActivateNextLogFile() ) {
-                // don't delay the next overflow notification.
-                overflowNotificationTime -= OVERFLOW_RENOTIFICATION_DELAY;
-                
-            } else {
-                
-                try {
-                    final FutureTask result = new FutureTask(new Callable() {
-                        public Object call() throws Exception {
-                            return queuedActivateNextLogFile();
-                        }});
-                    executor.execute(result);
-                    Location location = (Location) result.get();
-                    appendLogFileId = location.getLogFileId();
-                    appendLogFileOffset = location.getLogFileOffset();
-    
-                } catch (InterruptedException e) {
-                    throw (IOException) new IOException("Interrupted.").initCause(e);
-                }
-                catch (ExecutionException e) {
-                    throw handleExecutionException(e);
-                }
-            }
-        }
-    }
-
-    /**
-     * This is a blocking call
-     */
-    private Location queuedActivateNextLogFile() throws IOException {
-        file.activateNextLogFile();
-        return file.getNextAppendLocation();
-    }
-
-    
-    
-    /**
-     * @param recordLocator
-     * @param force
-     * @return
-     * @throws InvalidRecordLocationException
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    synchronized public void setMark(RecordLocation l, boolean force) throws InvalidRecordLocationException,
-            IOException {
-        
-        Location location = (Location) l;
-        if (location == null)
-            throw new InvalidRecordLocationException("The location cannot be null.");
-        if (lastMarkedLocation != null && location.compareTo(lastMarkedLocation) < 0)
-            throw new InvalidRecordLocationException("The location is less than the last mark.");
-        
-        markPacket.clear();
-        location.writeToPacket(markPacket);    
-        markPacket.flip();
-        write(LogFileManager.MARK_RECORD_TYPE, markPacket, force, location);
-        
-        lastMarkedLocation = location;
-    }
-
-    /**
-     * @return
-     */
-    public RecordLocation getMark() {
-        return lastMarkedLocation;
-    }
-
-    /**
-     * @param lastLocation
-     * @return
-     * @throws IOException
-     * @throws InvalidRecordLocationException
-     */
-    public RecordLocation getNextRecordLocation(final RecordLocation lastLocation) throws IOException,
-            InvalidRecordLocationException {
-        
-        if (lastLocation == null) {
-            if (lastMarkedLocation != null) {
-                return lastMarkedLocation;
-            } else {
-                return file.getFirstActiveLogLocation();
-            }
-        }
-
-        // Run this in the queued executor thread.
-        try {
-            final FutureTask result = new FutureTask(new Callable() {
-                public Object call() throws Exception {
-                    return queuedGetNextRecordLocation((Location) lastLocation);
-                }});
-            executor.execute(result);
-            return (Location) result.get();
-        } catch (InterruptedException e) {
-            throw (IOException) new IOException("Interrupted.").initCause(e);
-        }
-        catch (ExecutionException e) {
-            throw handleExecutionException(e);
-        }
-    }
-
-    protected IOException handleExecutionException(ExecutionException e) throws IOException {
-        Throwable cause = e.getCause();
-        if (cause instanceof IOException) {
-            return (IOException) cause;
-        }
-        else {
-            return (IOException) new IOException(cause.getMessage()).initCause(cause);
-        }
-    }
-
-    private Location queuedGetNextRecordLocation(Location location) throws IOException, InvalidRecordLocationException {
-        return file.getNextDataRecordLocation(location);
-    }
-
-    /**
-     * @param location
-     * @return
-     * @throws InvalidRecordLocationException
-     * @throws IOException
-     */
-    public Packet read(final RecordLocation l) throws IOException, InvalidRecordLocationException {
-        final Location location = (Location) l;
-        // Run this in the queued executor thread.
-        try {
-            final FutureTask result = new FutureTask(new Callable() {
-                public Object call() throws Exception {
-                    return file.readPacket(location);
-                }});
-            executor.execute(result);
-            return (Packet) result.get();
-        } catch (InterruptedException e) {
-            throw (IOException) new IOException("Interrupted.").initCause(e);
-        }
-        catch (ExecutionException e) {
-            throw handleExecutionException(e);
-        }
-    }
-
-    public void setJournalEventListener(JournalEventListener eventListener) {
-        this.eventListener = eventListener;
-    }
-
-    /**
-     * @deprecated @see #dispose()
-     */
-    public void close() throws IOException {
-    	dispose();
-    }
-    
-    /**
-     */
-    public void dispose() {
-        if (disposed)
-            return;
-        disposed=true;
-        executor.shutdown();
-        file.dispose();
-        ByteBufferPacketPool pool = packetPool;
-        packetPool=null;
-        disposeBufferPool(pool);
-    }
-
-    /**
-     * @return
-     */
-    public File getLogDirectory() {
-        return file.getLogDirectory();
-    }
-
-    public int getInitialLogFileSize() {
-        return file.getInitialLogFileSize();
-    }
-    
-    public String toString() {
-        return "Active Journal: using "+file.getOnlineLogFileCount()+" x " + (file.getInitialLogFileSize()/(1024*1024f)) + " Megs at: " + getLogDirectory();
-    }
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.active;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import org.apache.activeio.journal.InvalidRecordLocationException;
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalEventListener;
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.ByteBufferPacketPool;
+import org.apache.activeio.packet.Packet;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Callable;
+import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
+import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+/**
+ * A high speed Journal implementation. Inspired by the ideas of the <a
+ * href="http://howl.objectweb.org/">Howl </a> project but tailored to the needs
+ * of ActiveMQ. <p/>This Journal provides the following features:
+ * <ul>
+ * <li>Concurrent writes are batched into a single write/force done by a
+ * background thread.</li>
+ * <li>Uses preallocated logs to avoid disk fragmentation and performance
+ * degregation.</li>
+ * <li>The number and size of the preallocated logs are configurable.</li>
+ * <li>Uses direct ByteBuffers to write data to log files.</li>
+ * <li>Allows logs to grow in case of an overflow condition so that overflow
+ * exceptions are not not thrown. Grown logs that are inactivate (due to a new
+ * mark) are resized to their original size.</li>
+ * <li>No limit on the size of the record written to the journal</li>
+ * <li>Should be possible to extend so that multiple physical disk are used
+ * concurrently to increase throughput and decrease latency.</li>
+ * </ul>
+ * <p/>
+ * 
+ * @version $Revision: 1.1 $
+ */
+final public class JournalImpl implements Journal {
+
+    public static final int DEFAULT_POOL_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.journal.active.DefaultPoolSize", ""+(5)));
+    public static final int DEFAULT_PACKET_SIZE = Integer.parseInt(System.getProperty("org.apache.activeio.journal.active.DefaultPacketSize", ""+(1024*1024*4)));
+
+    static final private int OVERFLOW_RENOTIFICATION_DELAY = 500;
+    
+    static private ByteBufferPacketPool lastPool;
+    
+    private boolean disposed = false;
+
+    // The id of the current log file that is being filled.
+    private int appendLogFileId = 0;
+
+    // The offset in the current log file that is being filled.
+    private int appendLogFileOffset = 0;
+
+    // Used to batch writes together.
+    private BatchedWrite pendingBatchWrite;
+    
+    private Location lastMarkedLocation;
+    private LogFileManager file;
+    private ThreadPoolExecutor executor;
+    private int rolloverFence;
+    private JournalEventListener eventListener;
+    private ByteBufferPacketPool packetPool;
+    private long overflowNotificationTime = System.currentTimeMillis();
+    private Packet markPacket = new ByteArrayPacket(new byte[Location.SERIALIZED_SIZE]);
+
+    public JournalImpl(File logDirectory) throws IOException {
+        this(new LogFileManager(logDirectory));
+    }
+
+    public JournalImpl(File logDirectory, int logFileCount, int logFileSize) throws IOException {
+        this(new LogFileManager(logDirectory, logFileCount, logFileSize, null));
+    }
+    
+    public JournalImpl(File logDirectory, int logFileCount, int logFileSize, File archiveDirectory) throws IOException {
+        this(new LogFileManager(logDirectory, logFileCount, logFileSize, archiveDirectory));
+    }
+
+    public JournalImpl(LogFileManager logFile) {
+        this.file = logFile;
+        this.packetPool = createBufferPool();
+        this.executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread answer = new Thread(runnable, "Journal Writer");
+                answer.setPriority(Thread.MAX_PRIORITY);
+                answer.setDaemon(true);
+                return answer;
+            }
+        });
+        executor.allowCoreThreadTimeOut(true);
+        
+        lastMarkedLocation = file.getLastMarkedRecordLocation();
+        Location nextAppendLocation = file.getNextAppendLocation();
+        appendLogFileId = nextAppendLocation.getLogFileId();
+        appendLogFileOffset = nextAppendLocation.getLogFileOffset();
+        
+        rolloverFence = (file.getInitialLogFileSize() / 10) * 9;
+    }
+
+    
+    /**
+     * When running unit tests we may not be able to create new pools fast enough
+     * since the old pools are not being gc'ed fast enough.  So we pool the pool.
+     * @return
+     */
+    synchronized static private ByteBufferPacketPool createBufferPool() {
+        if( lastPool !=null ) {
+            ByteBufferPacketPool rc = lastPool;
+            lastPool = null;
+            return rc;
+        } else { 
+            return new ByteBufferPacketPool(DEFAULT_POOL_SIZE, DEFAULT_PACKET_SIZE);
+        }
+    }
+    
+    /**
+     * When running unit tests we may not be able to create new pools fast enough
+     * since the old pools are not being gc'ed fast enough.  So we pool the pool.
+     * @return
+     */
+    synchronized static private void disposeBufferPool(ByteBufferPacketPool pool) {
+        if( lastPool!=null ) {
+            pool.dispose();
+        } else {
+            pool.waitForPacketsToReturn();
+            lastPool = pool;
+        }
+    }
+
+
+
+    public RecordLocation write(Packet data, boolean sync) throws IOException {
+        return write(LogFileManager.DATA_RECORD_TYPE, data, sync, null);
+    }
+
+    private Location write(byte recordType, Packet data, boolean sync, Location mark) throws IOException {
+        try {
+            Location location;
+            BatchedWrite writeCommand;
+            
+            Record record = new Record(recordType, data, mark);
+            
+            // The following synchronized block is the bottle neck of the journal.  Make this
+            // code faster and the journal should speed up.
+            synchronized (this) {
+                if (disposed) {
+                    throw new IOException("Journal has been closed.");
+                }
+
+                // Create our record
+                location = new Location(appendLogFileId, appendLogFileOffset);
+                record.setLocation(location);
+                
+                // Piggy back the packet on the pending write batch.
+                writeCommand = addToPendingWriteBatch(record, mark, sync);
+
+                // Update where the next record will land.
+                appendLogFileOffset += data.limit() + Record.RECORD_BASE_SIZE;
+                rolloverCheck();
+            }
+
+            if (sync) {
+                writeCommand.waitForForce();
+            }
+
+            return location;
+        } catch (IOException e) {
+            throw e;
+        } catch (InterruptedException e) {
+            throw (IOException) new InterruptedIOException().initCause(e);
+        } catch (Throwable e) {
+            throw (IOException) new IOException("Write failed: " + e).initCause(e);
+        }
+    }
+
+    /**
+     * @param record
+     * @return
+     * @throws InterruptedException
+     */
+    private BatchedWrite addToPendingWriteBatch(Record record, Location mark, boolean force) throws InterruptedException {
+
+        // Load the write batch up with data from our record.
+        // it may take more than one write batch if the record is large.
+        BatchedWrite answer = null;
+        while (record.hasRemaining()) {
+            
+            // Do we need another BatchWrite?
+            boolean queueTheWrite=false;
+            if (pendingBatchWrite == null) {
+                pendingBatchWrite =  new BatchedWrite(packetPool.getPacket());
+                queueTheWrite = true;
+            }
+            answer = pendingBatchWrite;
+
+            // Can we continue to use the pendingBatchWrite?
+            boolean full = !pendingBatchWrite.append(record, mark, force);
+            
+            if( queueTheWrite ) {
+                final BatchedWrite queuedWrite = pendingBatchWrite;
+                executor.execute(new Runnable() {
+                    public void run() {
+                        try {
+                            queuedWrite(queuedWrite);
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                });
+            }
+            
+            if( full )
+                pendingBatchWrite = null;            
+        }
+        return answer;
+
+    }
+
+    /**
+     * This is a blocking call
+     * 
+     * @param write
+     * @throws InterruptedException
+     */
+    private void queuedWrite(BatchedWrite write) throws InterruptedException {
+
+        // Stop other threads from appending more pendingBatchWrite.
+        write.flip();
+
+        // Do the write.
+        try {
+            file.append(write);
+            write.forced();
+        } catch (Throwable e) {
+            write.writeFailed(e);
+        } finally {
+            write.getPacket().dispose();
+        }
+    }
+
+    /**
+     * 
+     */
+    private void rolloverCheck() throws IOException {
+
+        // See if we need to issue an overflow notification.
+        if (eventListener != null && file.isPastHalfActive()
+                && overflowNotificationTime + OVERFLOW_RENOTIFICATION_DELAY < System.currentTimeMillis()) {
+
+            // We need to send an overflow notification to free up
+            // some logFiles.
+            Location safeSpot = file.getFirstRecordLocationOfSecondActiveLogFile();
+            eventListener.overflowNotification(safeSpot);
+            overflowNotificationTime = System.currentTimeMillis();
+        }
+
+        // Is it time to roll over?
+        if (appendLogFileOffset > rolloverFence ) {
+
+            // Can we roll over?
+            if ( !file.canActivateNextLogFile() ) {
+                // don't delay the next overflow notification.
+                overflowNotificationTime -= OVERFLOW_RENOTIFICATION_DELAY;
+                
+            } else {
+                
+                try {
+                    final FutureTask result = new FutureTask(new Callable() {
+                        public Object call() throws Exception {
+                            return queuedActivateNextLogFile();
+                        }});
+                    executor.execute(result);
+                    Location location = (Location) result.get();
+                    appendLogFileId = location.getLogFileId();
+                    appendLogFileOffset = location.getLogFileOffset();
+    
+                } catch (InterruptedException e) {
+                    throw (IOException) new IOException("Interrupted.").initCause(e);
+                }
+                catch (ExecutionException e) {
+                    throw handleExecutionException(e);
+                }
+            }
+        }
+    }
+
+    /**
+     * This is a blocking call
+     */
+    private Location queuedActivateNextLogFile() throws IOException {
+        file.activateNextLogFile();
+        return file.getNextAppendLocation();
+    }
+
+    
+    
+    /**
+     * @param recordLocator
+     * @param force
+     * @return
+     * @throws InvalidRecordLocationException
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    synchronized public void setMark(RecordLocation l, boolean force) throws InvalidRecordLocationException,
+            IOException {
+        
+        Location location = (Location) l;
+        if (location == null)
+            throw new InvalidRecordLocationException("The location cannot be null.");
+        if (lastMarkedLocation != null && location.compareTo(lastMarkedLocation) < 0)
+            throw new InvalidRecordLocationException("The location is less than the last mark.");
+        
+        markPacket.clear();
+        location.writeToPacket(markPacket);    
+        markPacket.flip();
+        write(LogFileManager.MARK_RECORD_TYPE, markPacket, force, location);
+        
+        lastMarkedLocation = location;
+    }
+
+    /**
+     * @return
+     */
+    public RecordLocation getMark() {
+        return lastMarkedLocation;
+    }
+
+    /**
+     * @param lastLocation
+     * @return
+     * @throws IOException
+     * @throws InvalidRecordLocationException
+     */
+    public RecordLocation getNextRecordLocation(final RecordLocation lastLocation) throws IOException,
+            InvalidRecordLocationException {
+        
+        if (lastLocation == null) {
+            if (lastMarkedLocation != null) {
+                return lastMarkedLocation;
+            } else {
+                return file.getFirstActiveLogLocation();
+            }
+        }
+
+        // Run this in the queued executor thread.
+        try {
+            final FutureTask result = new FutureTask(new Callable() {
+                public Object call() throws Exception {
+                    return queuedGetNextRecordLocation((Location) lastLocation);
+                }});
+            executor.execute(result);
+            return (Location) result.get();
+        } catch (InterruptedException e) {
+            throw (IOException) new IOException("Interrupted.").initCause(e);
+        }
+        catch (ExecutionException e) {
+            throw handleExecutionException(e);
+        }
+    }
+
+    protected IOException handleExecutionException(ExecutionException e) throws IOException {
+        Throwable cause = e.getCause();
+        if (cause instanceof IOException) {
+            return (IOException) cause;
+        }
+        else {
+            return (IOException) new IOException(cause.getMessage()).initCause(cause);
+        }
+    }
+
+    private Location queuedGetNextRecordLocation(Location location) throws IOException, InvalidRecordLocationException {
+        return file.getNextDataRecordLocation(location);
+    }
+
+    /**
+     * @param location
+     * @return
+     * @throws InvalidRecordLocationException
+     * @throws IOException
+     */
+    public Packet read(final RecordLocation l) throws IOException, InvalidRecordLocationException {
+        final Location location = (Location) l;
+        // Run this in the queued executor thread.
+        try {
+            final FutureTask result = new FutureTask(new Callable() {
+                public Object call() throws Exception {
+                    return file.readPacket(location);
+                }});
+            executor.execute(result);
+            return (Packet) result.get();
+        } catch (InterruptedException e) {
+            throw (IOException) new IOException("Interrupted.").initCause(e);
+        }
+        catch (ExecutionException e) {
+            throw handleExecutionException(e);
+        }
+    }
+
+    public void setJournalEventListener(JournalEventListener eventListener) {
+        this.eventListener = eventListener;
+    }
+
+    /**
+     * @deprecated @see #dispose()
+     */
+    public void close() throws IOException {
+    	dispose();
+    }
+    
+    /**
+     */
+    public void dispose() {
+        if (disposed)
+            return;
+        disposed=true;
+        executor.shutdown();
+        file.dispose();
+        ByteBufferPacketPool pool = packetPool;
+        packetPool=null;
+        disposeBufferPool(pool);
+    }
+
+    /**
+     * @return
+     */
+    public File getLogDirectory() {
+        return file.getLogDirectory();
+    }
+
+    public int getInitialLogFileSize() {
+        return file.getInitialLogFileSize();
+    }
+    
+    public String toString() {
+        return "Active Journal: using "+file.getOnlineLogFileCount()+" x " + (file.getInitialLogFileSize()/(1024*1024f)) + " Megs at: " + getLogDirectory();
+    }
+
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/JournalImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Location.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Location.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Location.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Location.java Tue Feb 21 15:12:56 2006
@@ -1,96 +1,96 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.active;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.activeio.journal.RecordLocation;
-import org.apache.activeio.packet.Packet;
-import org.apache.activeio.packet.PacketData;
-
-/**
- * Defines a where a record can be located in the Journal.
- * 
- * @version $Revision: 1.1 $
- */
-final public class Location implements RecordLocation {
-    
-    static final public int SERIALIZED_SIZE=8;
-
-    final private int logFileId;
-    final private int logFileOffset;
-
-    public Location(int logFileId, int fileOffset) {
-        this.logFileId = logFileId;
-        this.logFileOffset = fileOffset;
-    }
-
-    public int compareTo(Object o) {
-        int rc = logFileId - ((Location) o).logFileId;
-        if (rc != 0)
-            return rc;
-
-        return logFileOffset - ((Location) o).logFileOffset;
-    }
-
-    public int hashCode() {
-        return logFileOffset ^ logFileId;
-    }
-
-    public boolean equals(Object o) {
-        if (o == null || o.getClass() != Location.class)
-            return false;
-        Location rl = (Location) o;
-        return rl.logFileId == this.logFileId && rl.logFileOffset == this.logFileOffset;
-    }
-
-    public String toString() {
-        return "" + logFileId + ":" + logFileOffset;
-    }
-
-    public int getLogFileId() {
-        return logFileId;
-    }
-
-    public int getLogFileOffset() {
-        return logFileOffset;
-    }
-    
-    public void writeToPacket(Packet packet) throws IOException {
-        PacketData data = new PacketData(packet);
-        data.writeInt(logFileId);
-        data.writeInt(logFileOffset);
-    }
-
-    public void writeToDataOutput(DataOutput data) throws IOException {
-        data.writeInt(logFileId);
-        data.writeInt(logFileOffset);
-    }    
-
-    static public Location readFromPacket(Packet packet) throws IOException {
-        PacketData data = new PacketData(packet);
-        return new Location(data.readInt(), data.readInt());
-    }
-
-    public static Location readFromDataInput(DataInput data) throws IOException {
-        return new Location(data.readInt(), data.readInt());
-    }
-
-    
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.active;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.PacketData;
+
+/**
+ * Defines a where a record can be located in the Journal.
+ * 
+ * @version $Revision: 1.1 $
+ */
+final public class Location implements RecordLocation {
+    
+    static final public int SERIALIZED_SIZE=8;
+
+    final private int logFileId;
+    final private int logFileOffset;
+
+    public Location(int logFileId, int fileOffset) {
+        this.logFileId = logFileId;
+        this.logFileOffset = fileOffset;
+    }
+
+    public int compareTo(Object o) {
+        int rc = logFileId - ((Location) o).logFileId;
+        if (rc != 0)
+            return rc;
+
+        return logFileOffset - ((Location) o).logFileOffset;
+    }
+
+    public int hashCode() {
+        return logFileOffset ^ logFileId;
+    }
+
+    public boolean equals(Object o) {
+        if (o == null || o.getClass() != Location.class)
+            return false;
+        Location rl = (Location) o;
+        return rl.logFileId == this.logFileId && rl.logFileOffset == this.logFileOffset;
+    }
+
+    public String toString() {
+        return "" + logFileId + ":" + logFileOffset;
+    }
+
+    public int getLogFileId() {
+        return logFileId;
+    }
+
+    public int getLogFileOffset() {
+        return logFileOffset;
+    }
+    
+    public void writeToPacket(Packet packet) throws IOException {
+        PacketData data = new PacketData(packet);
+        data.writeInt(logFileId);
+        data.writeInt(logFileOffset);
+    }
+
+    public void writeToDataOutput(DataOutput data) throws IOException {
+        data.writeInt(logFileId);
+        data.writeInt(logFileOffset);
+    }    
+
+    static public Location readFromPacket(Packet packet) throws IOException {
+        PacketData data = new PacketData(packet);
+        return new Location(data.readInt(), data.readInt());
+    }
+
+    public static Location readFromDataInput(DataInput data) throws IOException {
+        return new Location(data.readInt(), data.readInt());
+    }
+
+    
+}

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/Location.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFile.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFile.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFile.java (original)
+++ incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFile.java Tue Feb 21 15:12:56 2006
@@ -1,153 +1,153 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activeio.journal.active;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-/**
- * Allows read/append access to a LogFile.
- * 
- * @version $Revision: 1.1 $
- */
-final public class LogFile {
-
-    private final RandomAccessFile file;
-    private final FileChannel channel;
-
-    /** Prefered size. The size that the log file is set to when initilaized. */
-    private final int initialSize;
-
-    /** Where the we are in the file right now */
-    private int currentOffset;
-	private boolean disposed;
-    
-    public LogFile(File file, int initialSize) throws IOException {
-        this.initialSize = initialSize;
-        boolean initializationNeeeded = !file.exists();
-        this.file = new RandomAccessFile(file, "rw");
-        channel = this.file.getChannel();
-        if( initializationNeeeded )
-            resize();
-        channel.position(0);
-        reloadCurrentOffset();
-    }
-
-    /**
-     * To avoid doing un-needed seeks.
-     */
-    private void seek(int offset) throws IOException {
-        if( offset == currentOffset ) {
-            if( currentOffset != channel.position() )
-                throw new RuntimeException(" "+currentOffset+", "+channel.position() );                
-            return;
-        }
-        channel.position(offset);
-        currentOffset = offset;
-    }
-    private void reloadCurrentOffset() throws IOException {
-        currentOffset= (int) channel.position();
-    }
-    private void addToCurrentOffset(int rc) {
-        currentOffset+=rc;
-    }
-    
-    public boolean loadAndCheckRecord(int offset, Record record) throws IOException {
-        
-        try { 
-            // Read the next header
-            seek(offset);        
-            record.readHeader(file);
-                    
-            if (Record.isChecksumingEnabled()) {
-                record.checksum(file);
-            }            
-            // Load the footer.
-            seek(offset+record.getPayloadLength()+Record.RECORD_HEADER_SIZE);
-            record.readFooter(file);
-            
-            addToCurrentOffset(record.getRecordLength());
-            return true;
-                
-        } catch (IOException e) {
-            reloadCurrentOffset();
-            return false;
-        }
-    }
-    
-    public void resize() throws IOException {
-        file.setLength(initialSize);
-    }
-
-    public void force() throws IOException {
-        channel.force(false);
-    }
-
-    public void dispose() {
-    	if( disposed )
-    		return;
-    	disposed=true;
-        try {
-			this.file.close();
-		} catch (IOException e) {
-		}
-    }
-
-    public void write(int offset, ByteBuffer buffer) throws IOException {
-        
-        try {
-
-            int size = buffer.remaining();
-            seek(offset);
-            while (buffer.hasRemaining()) {
-                channel.write(buffer);                
-            }
-            addToCurrentOffset(size);
-            
-        } catch (IOException e) {
-            reloadCurrentOffset();
-        }
-    }
-
-    public void readRecordHeader(int offset, Record record) throws IOException {
-        seek(offset);  
-        try {
-            record.readHeader(file);
-        } catch ( IOException e ) {
-            reloadCurrentOffset();
-            throw e;
-        }
-        addToCurrentOffset(Record.RECORD_HEADER_SIZE);
-    }
-
-    public void read(int offset, byte[] answer) throws IOException {
-        seek(offset);
-        file.readFully(answer);
-        addToCurrentOffset(answer.length);
-    }
-
-    public void copyTo(File location) throws IOException {
-        FileOutputStream fos = new FileOutputStream(location);
-        channel.transferTo(0, channel.size(), fos.getChannel());
-        fos.getChannel().force(false);
-        fos.close();
-    }
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.journal.active;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * Allows read/append access to a LogFile.
+ * 
+ * @version $Revision: 1.1 $
+ */
+final public class LogFile {
+
+    private final RandomAccessFile file;
+    private final FileChannel channel;
+
+    /** Prefered size. The size that the log file is set to when initilaized. */
+    private final int initialSize;
+
+    /** Where the we are in the file right now */
+    private int currentOffset;
+	private boolean disposed;
+    
+    public LogFile(File file, int initialSize) throws IOException {
+        this.initialSize = initialSize;
+        boolean initializationNeeeded = !file.exists();
+        this.file = new RandomAccessFile(file, "rw");
+        channel = this.file.getChannel();
+        if( initializationNeeeded )
+            resize();
+        channel.position(0);
+        reloadCurrentOffset();
+    }
+
+    /**
+     * To avoid doing un-needed seeks.
+     */
+    private void seek(int offset) throws IOException {
+        if( offset == currentOffset ) {
+            if( currentOffset != channel.position() )
+                throw new RuntimeException(" "+currentOffset+", "+channel.position() );                
+            return;
+        }
+        channel.position(offset);
+        currentOffset = offset;
+    }
+    private void reloadCurrentOffset() throws IOException {
+        currentOffset= (int) channel.position();
+    }
+    private void addToCurrentOffset(int rc) {
+        currentOffset+=rc;
+    }
+    
+    public boolean loadAndCheckRecord(int offset, Record record) throws IOException {
+        
+        try { 
+            // Read the next header
+            seek(offset);        
+            record.readHeader(file);
+                    
+            if (Record.isChecksumingEnabled()) {
+                record.checksum(file);
+            }            
+            // Load the footer.
+            seek(offset+record.getPayloadLength()+Record.RECORD_HEADER_SIZE);
+            record.readFooter(file);
+            
+            addToCurrentOffset(record.getRecordLength());
+            return true;
+                
+        } catch (IOException e) {
+            reloadCurrentOffset();
+            return false;
+        }
+    }
+    
+    public void resize() throws IOException {
+        file.setLength(initialSize);
+    }
+
+    public void force() throws IOException {
+        channel.force(false);
+    }
+
+    public void dispose() {
+    	if( disposed )
+    		return;
+    	disposed=true;
+        try {
+			this.file.close();
+		} catch (IOException e) {
+		}
+    }
+
+    public void write(int offset, ByteBuffer buffer) throws IOException {
+        
+        try {
+
+            int size = buffer.remaining();
+            seek(offset);
+            while (buffer.hasRemaining()) {
+                channel.write(buffer);                
+            }
+            addToCurrentOffset(size);
+            
+        } catch (IOException e) {
+            reloadCurrentOffset();
+        }
+    }
+
+    public void readRecordHeader(int offset, Record record) throws IOException {
+        seek(offset);  
+        try {
+            record.readHeader(file);
+        } catch ( IOException e ) {
+            reloadCurrentOffset();
+            throw e;
+        }
+        addToCurrentOffset(Record.RECORD_HEADER_SIZE);
+    }
+
+    public void read(int offset, byte[] answer) throws IOException {
+        seek(offset);
+        file.readFully(answer);
+        addToCurrentOffset(answer.length);
+    }
+
+    public void copyTo(File location) throws IOException {
+        FileOutputStream fos = new FileOutputStream(location);
+        channel.transferTo(0, channel.size(), fos.getChannel());
+        fos.getChannel().force(false);
+        fos.close();
+    }
 }

Propchange: incubator/activemq/trunk/activeio/activeio-core/src/main/java/org/apache/activeio/journal/active/LogFile.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message