activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r825564 [3/5] - in /activemq/sandbox/activemq-flow: ./ activemq-util/src/main/java/org/apache/activemq/util/buffer/ hawtdb/ hawtdb/src/ hawtdb/src/main/ hawtdb/src/main/java/ hawtdb/src/main/java/org/ hawtdb/src/main/java/org/apache/ hawtdb...
Date Thu, 15 Oct 2009 17:04:15 GMT
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAppender.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAppender.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAppender.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
+import org.apache.activemq.util.list.LinkedNode;
+import org.apache.activemq.util.list.LinkedNodeList;
+
+/**
+ * An optimized writer to do batch appends to a data file. This object is thread
+ * safe and gains throughput as you increase the number of concurrent writes it
+ * does.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DataFileAppender {
+
+    protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
+
+    protected final Journal journal;
+    protected final Map<WriteKey, WriteCommand> inflightWrites;
+    protected final Object enqueueMutex = new Object() {
+    };
+    protected WriteBatch nextWriteBatch;
+
+    protected boolean shutdown;
+    protected IOException firstAsyncException;
+    protected final CountDownLatch shutdownDone = new CountDownLatch(1);
+    protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
+
+    private boolean running;
+    private Thread thread;
+
+    public static class WriteKey {
+        private final int file;
+        private final long offset;
+        private final int hash;
+
+        public WriteKey(Location item) {
+            file = item.getDataFileId();
+            offset = item.getOffset();
+            // TODO: see if we can build a better hash
+            hash = (int)(file ^ offset);
+        }
+
+        public int hashCode() {
+            return hash;
+        }
+
+        public boolean equals(Object obj) {
+            if (obj instanceof WriteKey) {
+                WriteKey di = (WriteKey)obj;
+                return di.file == file && di.offset == offset;
+            }
+            return false;
+        }
+    }
+
+    public class WriteBatch {
+
+        public final DataFile dataFile;
+
+        public final LinkedNodeList<WriteCommand> writes = new LinkedNodeList<WriteCommand>();
+        public final CountDownLatch latch = new CountDownLatch(1);
+		private final int offset;
+        public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
+
+        public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException {
+            this.dataFile = dataFile;
+			this.offset = offset;
+            this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+            this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
+            journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+            append(write);
+        }
+
+        public boolean canAppend(WriteCommand write) {
+            int newSize = size + write.location.getSize();
+			if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
+                return false;
+            }
+            return true;
+        }
+
+        public void append(WriteCommand write) throws IOException {
+            this.writes.addLast(write);
+            write.location.setDataFileId(dataFile.getDataFileId());
+            write.location.setOffset(offset+size);
+            int s = write.location.getSize();
+			size += s;
+            dataFile.incrementLength(s);
+            journal.addToTotalLength(s);
+        }
+    }
+
+    public static class WriteCommand extends LinkedNode<WriteCommand> {
+        public final Location location;
+        public final Buffer data;
+        final boolean sync;
+        public final Runnable onComplete;
+
+        public WriteCommand(Location location, Buffer data, boolean sync) {
+            this.location = location;
+            this.data = data;
+            this.sync = sync;
+            this.onComplete = null;
+        }
+
+        public WriteCommand(Location location, Buffer data, Runnable onComplete) {
+            this.location = location;
+            this.data = data;
+            this.onComplete = onComplete;
+            this.sync = false;
+        }
+    }
+
+    /**
+     * Construct a Store writer
+     * 
+     * @param fileId
+     */
+    public DataFileAppender(Journal dataManager) {
+        this.journal = dataManager;
+        this.inflightWrites = this.journal.getInflightWrites();
+    }
+
+    /**
+     * @param type
+     * @param marshaller
+     * @param payload
+     * @param type
+     * @param sync
+     * @return
+     * @throws IOException
+     * @throws
+     * @throws
+     */
+    public Location storeItem(Buffer data, byte type, boolean sync) throws IOException {
+
+        // Write the packet our internal buffer.
+        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
+        location.setType(type);
+
+        WriteBatch batch;
+        WriteCommand write = new WriteCommand(location, data, sync);
+
+        // Locate datafile and enqueue into the executor in sychronized block so
+        // that writes get equeued onto the executor in order that they were
+        // assigned
+        // by the data manager (which is basically just appending)
+
+        synchronized (this) {
+            batch = enqueue(write);
+        }
+        location.setLatch(batch.latch);
+        if (sync) {
+            try {
+                batch.latch.await();
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+        }	
+
+        return location;
+    }
+
+    public Location storeItem(Buffer data, byte type, Runnable onComplete) throws IOException {
+        // Write the packet our internal buffer.
+        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
+        location.setType(type);
+
+        WriteBatch batch;
+        WriteCommand write = new WriteCommand(location, data, onComplete);
+
+        synchronized (this) {
+            batch = enqueue(write);
+        }
+ 
+        location.setLatch(batch.latch);
+        return location;
+    }
+
+    private WriteBatch enqueue(WriteCommand write) throws IOException {
+        synchronized (enqueueMutex) {
+            if (shutdown) {
+                throw new IOException("Async Writter Thread Shutdown");
+            }
+            if (firstAsyncException != null) {
+                throw firstAsyncException;
+            }
+
+            if (!running) {
+                running = true;
+                thread = new Thread() {
+                    public void run() {
+                        processQueue();
+                    }
+                };
+                thread.setPriority(Thread.MAX_PRIORITY);
+                thread.setDaemon(true);
+                thread.setName("ActiveMQ Data File Writer");
+                thread.start();
+            }
+
+            while ( true ) {
+	            if (nextWriteBatch == null) {
+	            	DataFile file = journal.getCurrentWriteFile();
+	            	if( file.getLength() > journal.getMaxFileLength() ) {
+	            		file = journal.rotateWriteFile();
+	            	}
+	            	
+	                nextWriteBatch = new WriteBatch(file, file.getLength(), write);
+	                enqueueMutex.notify();
+	                break;
+	            } else {
+	                // Append to current batch if possible..
+	                if (nextWriteBatch.canAppend(write)) {
+	                    nextWriteBatch.append(write);
+	                    break;
+	                } else {
+	                    // Otherwise wait for the queuedCommand to be null
+	                    try {
+	                        while (nextWriteBatch != null) {
+	                            enqueueMutex.wait();
+	                        }
+	                    } catch (InterruptedException e) {
+	                        throw new InterruptedIOException();
+	                    }
+	                    if (shutdown) {
+	                        throw new IOException("Async Writter Thread Shutdown");
+	                    }
+	                }
+	            }
+            }
+            if (!write.sync) {
+                inflightWrites.put(new WriteKey(write.location), write);
+            }
+            return nextWriteBatch;
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (enqueueMutex) {
+            if (!shutdown) {
+                shutdown = true;
+                if (running) {
+                    enqueueMutex.notifyAll();
+                } else {
+                    shutdownDone.countDown();
+                }
+            }
+        }
+
+        try {
+            shutdownDone.await();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
+
+    }
+
+    /**
+     * The async processing loop that writes to the data files and does the
+     * force calls. Since the file sync() call is the slowest of all the
+     * operations, this algorithm tries to 'batch' or group together several
+     * file sync() requests into a single file sync() call. The batching is
+     * accomplished attaching the same CountDownLatch instance to every force
+     * request in a group.
+     */
+    protected void processQueue() {
+        DataFile dataFile = null;
+        RandomAccessFile file = null;
+        try {
+
+            DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
+            while (true) {
+
+                Object o = null;
+
+                // Block till we get a command.
+                synchronized (enqueueMutex) {
+                    while (true) {
+                        if (nextWriteBatch != null) {
+                            o = nextWriteBatch;
+                            nextWriteBatch = null;
+                            break;
+                        }
+                        if (shutdown) {
+                            return;
+                        }
+                        enqueueMutex.wait();
+                    }
+                    enqueueMutex.notify();
+                }
+
+                WriteBatch wb = (WriteBatch)o;
+                if (dataFile != wb.dataFile) {
+                    if (file != null) {
+                        file.setLength(dataFile.getLength());
+                        dataFile.closeRandomAccessFile(file);
+                    }
+                    dataFile = wb.dataFile;
+                    file = dataFile.openRandomAccessFile();
+                    if( file.length() < journal.preferedFileLength ) {
+                        file.setLength(journal.preferedFileLength);
+                    }
+                }
+
+                WriteCommand write = wb.writes.getHead();
+
+                // Write an empty batch control record.
+                buff.reset();
+                buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
+                buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
+                buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
+                buff.writeInt(0);
+                buff.writeLong(0);
+                
+                boolean forceToDisk = false;
+                while (write != null) {
+                    forceToDisk |= write.sync | write.onComplete != null;
+                    buff.writeInt(write.location.getSize());
+                    buff.writeByte(write.location.getType());
+                    buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+                    write = write.getNext();
+                }
+
+                Buffer sequence = buff.toBuffer();
+                
+                // Now we can fill in the batch control record properly. 
+                buff.reset();
+                buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
+                buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+                if( journal.isChecksum() ) {
+	                Checksum checksum = new Adler32();
+	                checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+	                buff.writeLong(checksum.getValue());
+                }
+
+                // Now do the 1 big write.
+                file.seek(wb.offset);
+                file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+                
+                ReplicationTarget replicationTarget = journal.getReplicationTarget();
+                if( replicationTarget!=null ) {
+                	replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
+                }
+                
+                if (forceToDisk) {
+                    IOHelper.sync(file.getFD());
+                }
+
+                WriteCommand lastWrite = wb.writes.getTail();
+                journal.setLastAppendLocation(lastWrite.location);
+
+                // Now that the data is on disk, remove the writes from the in
+                // flight
+                // cache.
+                write = wb.writes.getHead();
+                while (write != null) {
+                    if (!write.sync) {
+                        inflightWrites.remove(new WriteKey(write.location));
+                    }
+                    if (write.onComplete != null) {
+                        try {
+                            write.onComplete.run();
+                        } catch (Throwable e) {
+                            e.printStackTrace();
+                        }
+                    }
+                    write = write.getNext();
+                }
+
+                // Signal any waiting threads that the write is on disk.
+                wb.latch.countDown();
+            }
+        } catch (IOException e) {
+            synchronized (enqueueMutex) {
+                firstAsyncException = e;
+            }
+        } catch (InterruptedException e) {
+        } finally {
+            try {
+                if (file != null) {
+                    dataFile.closeRandomAccessFile(file);
+                }
+            } catch (Throwable ignore) {
+            }
+            shutdownDone.countDown();
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Journal.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Journal.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Journal.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.activemq.util.Scheduler;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.DataByteArrayInputStream;
+import org.apache.activemq.util.list.LinkedNodeList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawtdb.internal.journal.DataFileAppender.WriteCommand;
+import org.apache.hawtdb.internal.journal.DataFileAppender.WriteKey;
+
+/**
+ * Manages DataFiles
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Journal {
+
+    private static final int MAX_BATCH_SIZE = 32*1024*1024;
+
+	// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+    public static final int RECORD_HEAD_SPACE = 4 + 1;
+    
+    public static final byte USER_RECORD_TYPE = 1;
+    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
+    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 
+    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
+    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
+    
+    public static final String DEFAULT_DIRECTORY = ".";
+    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
+    public static final String DEFAULT_FILE_PREFIX = "db-";
+    public static final String DEFAULT_FILE_SUFFIX = ".log";
+    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
+    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
+    public static final int PREFERED_DIFF = 1024 * 512;
+
+    private static final Log LOG = LogFactory.getLog(Journal.class);
+
+    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
+
+    protected File directory = new File(DEFAULT_DIRECTORY);
+    protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
+    protected String filePrefix = DEFAULT_FILE_PREFIX;
+    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
+    protected boolean started;
+    
+    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
+    protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
+
+    protected DataFileAppender appender;
+    protected DataFileAccessorPool accessorPool;
+
+    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
+    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
+
+    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
+    protected Runnable cleanupTask;
+    protected final AtomicLong totalLength = new AtomicLong();
+    protected boolean archiveDataLogs;
+	private ReplicationTarget replicationTarget;
+    protected boolean checksum;
+
+    public synchronized void start() throws IOException {
+        if (started) {
+            return;
+        }
+        
+        long start = System.currentTimeMillis();
+        accessorPool = new DataFileAccessorPool(this);
+        started = true;
+        preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
+
+        appender = new DataFileAppender(this);
+
+        File[] files = directory.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String n) {
+                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
+            }
+        });
+
+        if (files != null) {
+            for (int i = 0; i < files.length; i++) {
+                try {
+                    File file = files[i];
+                    String n = file.getName();
+                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
+                    int num = Integer.parseInt(numStr);
+                    DataFile dataFile = new DataFile(file, num, preferedFileLength);
+                    fileMap.put(dataFile.getDataFileId(), dataFile);
+                    totalLength.addAndGet(dataFile.getLength());
+                } catch (NumberFormatException e) {
+                    // Ignore file that do not match the pattern.
+                }
+            }
+
+            // Sort the list so that we can link the DataFiles together in the
+            // right order.
+            List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
+            Collections.sort(l);
+            for (DataFile df : l) {
+                dataFiles.addLast(df);
+                fileByFileMap.put(df.getFile(), df);
+            }
+        }
+
+    	getCurrentWriteFile();
+        try {
+        	Location l = recoveryCheck(dataFiles.getTail());
+            lastAppendLocation.set(l);
+        } catch (IOException e) {
+            LOG.warn("recovery check failed", e);
+        }
+        
+        cleanupTask = new Runnable() {
+            public void run() {
+                cleanup();
+            }
+        };
+        Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
+        long end = System.currentTimeMillis();
+        LOG.trace("Startup took: "+(end-start)+" ms");
+    }
+
+    private static byte[] bytes(String string) {
+    	try {
+			return string.getBytes("UTF-8");
+		} catch (UnsupportedEncodingException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	protected Location recoveryCheck(DataFile dataFile) throws IOException {
+    	byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
+    	DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
+    	
+        Location location = new Location();
+        location.setDataFileId(dataFile.getDataFileId());
+        location.setOffset(0);
+
+    	DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+        try {
+            while( true ) {
+	        	reader.read(location.getOffset(), controlRecord);
+	        	controlIs.restart();
+	        	
+	        	// Assert that it's  a batch record.
+	        	if( controlIs.readInt() != BATCH_CONTROL_RECORD_SIZE ) {
+	        		break;
+	        	}
+	        	if( controlIs.readByte() != BATCH_CONTROL_RECORD_TYPE ) {
+	        		break;
+	        	}
+	        	for( int i=0; i < BATCH_CONTROL_RECORD_MAGIC.length; i++ ) {
+	        		if( controlIs.readByte() != BATCH_CONTROL_RECORD_MAGIC[i] ) {
+	        			break;
+	        		}
+	        	}
+	        	
+	        	int size = controlIs.readInt();
+	        	if( size > MAX_BATCH_SIZE ) {
+	        		break;
+	        	}
+	        	
+	        	if( isChecksum() ) {
+		        	
+	        		long expectedChecksum = controlIs.readLong();	        	
+		        	
+	        		byte data[] = new byte[size];
+		        	reader.read(location.getOffset()+BATCH_CONTROL_RECORD_SIZE, data);
+		        	
+		        	Checksum checksum = new Adler32();
+	                checksum.update(data, 0, data.length);
+	                
+	                if( expectedChecksum!=checksum.getValue() ) {
+	                	break;
+	                }
+	                
+	        	}
+                
+	        	
+                location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+            }
+            
+        } catch (IOException e) {
+		} finally {
+            accessorPool.closeDataFileAccessor(reader);
+        }
+        
+        dataFile.setLength(location.getOffset());
+        return location;
+    }
+
+	void addToTotalLength(int size) {
+		totalLength.addAndGet(size);
+	}
+    
+    
+    synchronized DataFile getCurrentWriteFile() throws IOException {
+        if (dataFiles.isEmpty()) {
+            rotateWriteFile();
+        }
+        return dataFiles.getTail();
+    }
+
+    synchronized DataFile rotateWriteFile() {
+		int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+		File file = getFile(nextNum);
+		DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
+		// actually allocate the disk space
+		fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+		fileByFileMap.put(file, nextWriteFile);
+		dataFiles.addLast(nextWriteFile);
+		return nextWriteFile;
+	}
+
+	public File getFile(int nextNum) {
+		String fileName = filePrefix + nextNum + fileSuffix;
+		File file = new File(directory, fileName);
+		return file;
+	}
+
+    synchronized DataFile getDataFile(Location item) throws IOException {
+        Integer key = Integer.valueOf(item.getDataFileId());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
+        }
+        return dataFile;
+    }
+
+    synchronized File getFile(Location item) throws IOException {
+        Integer key = Integer.valueOf(item.getDataFileId());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
+        }
+        return dataFile.getFile();
+    }
+
+    private DataFile getNextDataFile(DataFile dataFile) {
+        return dataFile.getNext();
+    }
+
+    public synchronized void close() throws IOException {
+        if (!started) {
+            return;
+        }
+        Scheduler.cancel(cleanupTask);
+        accessorPool.close();
+        appender.close();
+        fileMap.clear();
+        fileByFileMap.clear();
+        dataFiles.clear();
+        lastAppendLocation.set(null);
+        started = false;
+    }
+
+    synchronized void cleanup() {
+        if (accessorPool != null) {
+            accessorPool.disposeUnused();
+        }
+    }
+
+    public synchronized boolean delete() throws IOException {
+
+        // Close all open file handles...
+        appender.close();
+        accessorPool.close();
+
+        boolean result = true;
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
+            totalLength.addAndGet(-dataFile.getLength());
+            result &= dataFile.delete();
+        }
+        fileMap.clear();
+        fileByFileMap.clear();
+        lastAppendLocation.set(null);
+        dataFiles = new LinkedNodeList<DataFile>();
+
+        // reopen open file handles...
+        accessorPool = new DataFileAccessorPool(this);
+        appender = new DataFileAppender(this);
+        return result;
+    }
+
+    public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
+        for (Integer key : files) {
+            // Can't remove the data file (or subsequent files) that is currently being written to.
+        	if( key >= lastAppendLocation.get().getDataFileId() ) {
+        		continue;
+        	}
+            DataFile dataFile = fileMap.get(key);
+            if( dataFile!=null ) {
+            	forceRemoveDataFile(dataFile);
+            }
+        }
+    }
+
+    private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
+        accessorPool.disposeDataFileAccessors(dataFile);
+        fileByFileMap.remove(dataFile.getFile());
+        fileMap.remove(dataFile.getDataFileId());
+        totalLength.addAndGet(-dataFile.getLength());
+        dataFile.unlink();
+        if (archiveDataLogs) {
+            dataFile.move(getDirectoryArchive());
+            LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
+        } else {
+            if ( dataFile.delete() ) {
+            	LOG.debug("Discarded data file " + dataFile);
+            } else {
+            	LOG.warn("Failed to discard data file " + dataFile.getFile());
+            }
+        }
+    }
+
+    /**
+     * @return the maxFileLength
+     */
+    public int getMaxFileLength() {
+        return maxFileLength;
+    }
+
+    /**
+     * @param maxFileLength the maxFileLength to set
+     */
+    public void setMaxFileLength(int maxFileLength) {
+        this.maxFileLength = maxFileLength;
+    }
+
+    public String toString() {
+        return directory.toString();
+    }
+
+	public synchronized void appendedExternally(Location loc, int length) throws IOException {
+		DataFile dataFile = null;
+		if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
+			// It's an update to the current log file..
+			dataFile = dataFiles.getTail();
+			dataFile.incrementLength(length);
+		} else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
+			// It's an update to the next log file.
+            int nextNum = loc.getDataFileId();
+            File file = getFile(nextNum);
+            dataFile = new DataFile(file, nextNum, preferedFileLength);
+            // actually allocate the disk space
+            fileMap.put(dataFile.getDataFileId(), dataFile);
+            fileByFileMap.put(file, dataFile);
+            dataFiles.addLast(dataFile);
+		} else {
+			throw new IOException("Invalid external append.");
+		}
+	}
+
+    public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
+
+        Location cur = null;
+        while (true) {
+            if (cur == null) {
+                if (location == null) {
+                    DataFile head = dataFiles.getHead();
+                    if( head == null ) {
+                    	return null;
+                    }
+                    cur = new Location();
+                    cur.setDataFileId(head.getDataFileId());
+                    cur.setOffset(0);
+                } else {
+                    // Set to the next offset..
+                    if (location.getSize() == -1) {
+                        cur = new Location(location);
+                    } else {
+                        cur = new Location(location);
+                        cur.setOffset(location.getOffset() + location.getSize());
+                    }
+                }
+            } else {
+                cur.setOffset(cur.getOffset() + cur.getSize());
+            }
+
+            DataFile dataFile = getDataFile(cur);
+
+            // Did it go into the next file??
+            if (dataFile.getLength() <= cur.getOffset()) {
+                dataFile = getNextDataFile(dataFile);
+                if (dataFile == null) {
+                    return null;
+                } else {
+                    cur.setDataFileId(dataFile.getDataFileId().intValue());
+                    cur.setOffset(0);
+                }
+            }
+
+            // Load in location size and type.
+            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+            try {
+				reader.readLocationDetails(cur);
+            } finally {
+                accessorPool.closeDataFileAccessor(reader);
+            }
+
+            if (cur.getType() == 0) {
+                return null;
+            } else if (cur.getType() == USER_RECORD_TYPE) {
+                // Only return user records.
+                return cur;
+            }
+        }
+    }
+
+    public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
+        DataFile df = fileByFileMap.get(file);
+        return getNextLocation(df, lastLocation, thisFileOnly);
+    }
+
+    public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
+
+        Location cur = null;
+        while (true) {
+            if (cur == null) {
+                if (lastLocation == null) {
+                    DataFile head = dataFile.getHeadNode();
+                    cur = new Location();
+                    cur.setDataFileId(head.getDataFileId());
+                    cur.setOffset(0);
+                } else {
+                    // Set to the next offset..
+                    cur = new Location(lastLocation);
+                    cur.setOffset(cur.getOffset() + cur.getSize());
+                }
+            } else {
+                cur.setOffset(cur.getOffset() + cur.getSize());
+            }
+
+            // Did it go into the next file??
+            if (dataFile.getLength() <= cur.getOffset()) {
+                if (thisFileOnly) {
+                    return null;
+                } else {
+                    dataFile = getNextDataFile(dataFile);
+                    if (dataFile == null) {
+                        return null;
+                    } else {
+                        cur.setDataFileId(dataFile.getDataFileId().intValue());
+                        cur.setOffset(0);
+                    }
+                }
+            }
+
+            // Load in location size and type.
+            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+            try {
+                reader.readLocationDetails(cur);
+            } finally {
+                accessorPool.closeDataFileAccessor(reader);
+            }
+
+            if (cur.getType() == 0) {
+                return null;
+            } else if (cur.getType() > 0) {
+                // Only return user records.
+                return cur;
+            }
+        }
+    }
+
+    public synchronized Buffer read(Location location) throws IOException, IllegalStateException {
+        DataFile dataFile = getDataFile(location);
+        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+        Buffer rc = null;
+        try {
+            rc = reader.readRecord(location);
+        } finally {
+            accessorPool.closeDataFileAccessor(reader);
+        }
+        return rc;
+    }
+
+    public synchronized Location write(Buffer data, boolean sync) throws IOException, IllegalStateException {
+        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
+        return loc;
+    }
+
+    public synchronized Location write(Buffer data, Runnable onComplete) throws IOException, IllegalStateException {
+        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
+        return loc;
+    }
+
+    public void update(Location location, Buffer data, boolean sync) throws IOException {
+        DataFile dataFile = getDataFile(location);
+        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
+        try {
+            updater.updateRecord(location, data, sync);
+        } finally {
+            accessorPool.closeDataFileAccessor(updater);
+        }
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public String getFilePrefix() {
+        return filePrefix;
+    }
+
+    public void setFilePrefix(String filePrefix) {
+        this.filePrefix = filePrefix;
+    }
+
+    public Map<WriteKey, WriteCommand> getInflightWrites() {
+        return inflightWrites;
+    }
+
+    public Location getLastAppendLocation() {
+        return lastAppendLocation.get();
+    }
+
+    public void setLastAppendLocation(Location lastSyncedLocation) {
+        this.lastAppendLocation.set(lastSyncedLocation);
+    }
+
+    public File getDirectoryArchive() {
+        return directoryArchive;
+    }
+
+    public void setDirectoryArchive(File directoryArchive) {
+        this.directoryArchive = directoryArchive;
+    }
+
+    public boolean isArchiveDataLogs() {
+        return archiveDataLogs;
+    }
+
+    public void setArchiveDataLogs(boolean archiveDataLogs) {
+        this.archiveDataLogs = archiveDataLogs;
+    }
+
+    synchronized public Integer getCurrentDataFileId() {
+        if (dataFiles.isEmpty())
+            return null;
+        return dataFiles.getTail().getDataFileId();
+    }
+
+    /**
+     * Get a set of files - only valid after start()
+     * 
+     * @return files currently being used
+     */
+    public Set<File> getFiles() {
+        return fileByFileMap.keySet();
+    }
+
+    public Map<Integer, DataFile> getFileMap() {
+        return new TreeMap<Integer, DataFile>(fileMap);
+    }
+    
+    public long getDiskSize() {
+        long tailLength=0;
+        synchronized( this ) {
+            if( !dataFiles.isEmpty() ) {
+                tailLength = dataFiles.getTail().getLength();
+            }
+        }
+        
+        long rc = totalLength.get();
+        
+        // The last file is actually at a minimum preferedFileLength big.
+        if( tailLength < preferedFileLength ) {
+            rc -= tailLength;
+            rc += preferedFileLength;
+        }
+        return rc;
+    }
+
+	public void setReplicationTarget(ReplicationTarget replicationTarget) {
+		this.replicationTarget = replicationTarget;
+	}
+	public ReplicationTarget getReplicationTarget() {
+		return replicationTarget;
+	}
+
+    public String getFileSuffix() {
+        return fileSuffix;
+    }
+
+    public void setFileSuffix(String fileSuffix) {
+        this.fileSuffix = fileSuffix;
+    }
+
+	public boolean isChecksum() {
+		return checksum;
+	}
+
+	public void setChecksum(boolean checksumWrites) {
+		this.checksum = checksumWrites;
+	}
+
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Used as a location in the data store.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public final class Location implements Comparable<Location> {
+
+    public static final byte USER_TYPE = 1;
+    public static final byte NOT_SET_TYPE = 0;
+    public static final int NOT_SET = -1;
+
+    private int dataFileId = NOT_SET;
+    private int offset = NOT_SET;
+    private int size = NOT_SET;
+    private byte type = NOT_SET_TYPE;
+    private CountDownLatch latch;
+
+    public Location() {
+    }
+
+    public Location(Location item) {
+        this.dataFileId = item.dataFileId;
+        this.offset = item.offset;
+        this.size = item.size;
+        this.type = item.type;
+    }
+
+    public Location(int dataFileId, int offset) {
+        this.dataFileId=dataFileId;
+        this.offset=offset;
+    }
+
+    boolean isValid() {
+        return dataFileId != NOT_SET;
+    }
+
+    /**
+     * @return the size of the data record including the header.
+     */
+    public int getSize() {
+        return size;
+    }
+
+    /**
+     * @param size the size of the data record including the header.
+     */
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+    public int getOffset() {
+        return offset;
+    }
+
+    public void setOffset(int offset) {
+        this.offset = offset;
+    }
+
+    public int getDataFileId() {
+        return dataFileId;
+    }
+
+    public void setDataFileId(int file) {
+        this.dataFileId = file;
+    }
+
+    public byte getType() {
+        return type;
+    }
+
+    public void setType(byte type) {
+        this.type = type;
+    }
+
+    public String toString() {
+        return dataFileId+":"+offset;
+    }
+
+    public void writeExternal(DataOutput dos) throws IOException {
+        dos.writeInt(dataFileId);
+        dos.writeInt(offset);
+        dos.writeInt(size);
+        dos.writeByte(type);
+    }
+
+    public void readExternal(DataInput dis) throws IOException {
+        dataFileId = dis.readInt();
+        offset = dis.readInt();
+        size = dis.readInt();
+        type = dis.readByte();
+    }
+
+    public CountDownLatch getLatch() {
+        return latch;
+    }
+
+    public void setLatch(CountDownLatch latch) {
+        this.latch = latch;
+    }
+
+    public int compareTo(Location o) {
+        Location l = (Location)o;
+        if (dataFileId == l.dataFileId) {
+            int rc = offset - l.offset;
+            return rc;
+        }
+        return dataFileId - l.dataFileId;
+    }
+
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof Location) {
+            result = compareTo((Location)o) == 0;
+        }
+        return result;
+    }
+
+    public int hashCode() {
+        return dataFileId ^ offset;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyDataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyDataFile.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyDataFile.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyDataFile.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/**
+ * Allows you to open a data file in read only mode.  Useful when working with 
+ * archived data files.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ReadOnlyDataFile extends DataFile {
+
+    ReadOnlyDataFile(File file, int number, int preferedSize) {
+        super(file, number, preferedSize);
+    }
+    
+    public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+        return new RandomAccessFile(file, "r");
+    }
+
+    public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+        file.close();
+    }
+
+    public synchronized boolean delete() throws IOException {
+        throw new RuntimeException("Not valid on a read only file.");
+    }
+    
+    public synchronized void move(File targetDirectory) throws IOException{
+        throw new RuntimeException("Not valid on a read only file.");
+    }
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyJournal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyJournal.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyJournal.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyJournal.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * An AsyncDataManager that works in read only mode against multiple data directories.
+ * Useful for reading back archived data files.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ReadOnlyJournal extends Journal {
+    
+    private final ArrayList<File> dirs;
+
+    public ReadOnlyJournal(final ArrayList<File> dirs) {
+        this.dirs = dirs;
+    }
+
+    public synchronized void start() throws IOException {
+        if (started) {
+            return;
+        }
+
+        started = true;
+                
+        ArrayList<File> files = new ArrayList<File>();
+        for (File directory : dirs) {
+            final File d = directory;
+            File[] f = d.listFiles(new FilenameFilter() {
+                public boolean accept(File dir, String n) {
+                    return dir.equals(d) && n.startsWith(filePrefix);
+                }
+            });
+            for (int i = 0; i < f.length; i++) {
+                files.add(f[i]);
+            }
+        }
+       
+        for (File file : files) {
+            try {
+                String n = file.getName();
+                String numStr = n.substring(filePrefix.length(), n.length());
+                int num = Integer.parseInt(numStr);
+                DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
+                fileMap.put(dataFile.getDataFileId(), dataFile);
+                totalLength.addAndGet(dataFile.getLength());
+            } catch (NumberFormatException e) {
+                // Ignore file that do not match the pattern.
+            }
+        }
+
+        // Sort the list so that we can link the DataFiles together in the
+        // right order.
+        List<DataFile> list = new ArrayList<DataFile>(fileMap.values());
+        Collections.sort(list);
+        for (DataFile df : list) {
+            dataFiles.addLast(df);
+            fileByFileMap.put(df.getFile(), df);
+        }
+        
+//        // Need to check the current Write File to see if there was a partial
+//        // write to it.
+//        if (!dataFiles.isEmpty()) {
+//
+//            // See if the lastSyncedLocation is valid..
+//            Location l = lastAppendLocation.get();
+//            if (l != null && l.getDataFileId() != dataFiles.getTail().getDataFileId().intValue()) {
+//                l = null;
+//            }
+//            
+//            // If we know the last location that was ok.. then we can skip lots
+//            // of checking
+//            try {
+//                l = recoveryCheck(dataFiles.getTail(), l);
+//                lastAppendLocation.set(l);
+//            } catch (IOException e) {
+//                LOG.warn("recovery check failed", e);
+//            }
+//        }
+    }
+    
+    public synchronized void close() throws IOException {
+        if (!started) {
+            return;
+        }
+        accessorPool.close();
+        fileMap.clear();
+        fileByFileMap.clear();
+        started = false;
+    }
+
+    
+    public Location getFirstLocation() throws IllegalStateException, IOException {
+        if( dataFiles.isEmpty() ) {
+            return null;
+        }
+        
+        DataFile first = dataFiles.getHead();
+        Location cur = new Location();
+        cur.setDataFileId(first.getDataFileId());
+        cur.setOffset(0);
+        cur.setSize(0);
+        return getNextLocation(cur);
+    }
+    
+    @Override
+    public synchronized boolean delete() throws IOException {
+        throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
+    }    
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReplicationTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReplicationTarget.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReplicationTarget.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReplicationTarget.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface ReplicationTarget {
+
+	void replicate(Location location, Buffer sequence, boolean sync);
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html Thu Oct 15 17:04:11 2009
@@ -0,0 +1,23 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+journal based data storage - scalable and fast
+</body>
+</html>

Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message