Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAppender.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAppender.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAppender.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
+import org.apache.activemq.util.list.LinkedNode;
+import org.apache.activemq.util.list.LinkedNodeList;
+
+/**
+ * An optimized writer to do batch appends to a data file. This object is thread
+ * safe and gains throughput as you increase the number of concurrent writes it
+ * does.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DataFileAppender {
+
+ protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
+
+ protected final Journal journal;
+ protected final Map<WriteKey, WriteCommand> inflightWrites;
+ protected final Object enqueueMutex = new Object() {
+ };
+ protected WriteBatch nextWriteBatch;
+
+ protected boolean shutdown;
+ protected IOException firstAsyncException;
+ protected final CountDownLatch shutdownDone = new CountDownLatch(1);
+ protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
+
+ private boolean running;
+ private Thread thread;
+
+ public static class WriteKey {
+ private final int file;
+ private final long offset;
+ private final int hash;
+
+ public WriteKey(Location item) {
+ file = item.getDataFileId();
+ offset = item.getOffset();
+ // TODO: see if we can build a better hash
+ hash = (int)(file ^ offset);
+ }
+
+ public int hashCode() {
+ return hash;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof WriteKey) {
+ WriteKey di = (WriteKey)obj;
+ return di.file == file && di.offset == offset;
+ }
+ return false;
+ }
+ }
+
+ public class WriteBatch {
+
+ public final DataFile dataFile;
+
+ public final LinkedNodeList<WriteCommand> writes = new LinkedNodeList<WriteCommand>();
+ public final CountDownLatch latch = new CountDownLatch(1);
+ private final int offset;
+ public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
+
+ public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException {
+ this.dataFile = dataFile;
+ this.offset = offset;
+ this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+ this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
+ journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+ append(write);
+ }
+
+ public boolean canAppend(WriteCommand write) {
+ int newSize = size + write.location.getSize();
+ if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
+ return false;
+ }
+ return true;
+ }
+
+ public void append(WriteCommand write) throws IOException {
+ this.writes.addLast(write);
+ write.location.setDataFileId(dataFile.getDataFileId());
+ write.location.setOffset(offset+size);
+ int s = write.location.getSize();
+ size += s;
+ dataFile.incrementLength(s);
+ journal.addToTotalLength(s);
+ }
+ }
+
+ public static class WriteCommand extends LinkedNode<WriteCommand> {
+ public final Location location;
+ public final Buffer data;
+ final boolean sync;
+ public final Runnable onComplete;
+
+ public WriteCommand(Location location, Buffer data, boolean sync) {
+ this.location = location;
+ this.data = data;
+ this.sync = sync;
+ this.onComplete = null;
+ }
+
+ public WriteCommand(Location location, Buffer data, Runnable onComplete) {
+ this.location = location;
+ this.data = data;
+ this.onComplete = onComplete;
+ this.sync = false;
+ }
+ }
+
+ /**
+ * Construct a Store writer
+ *
+ * @param fileId
+ */
+ public DataFileAppender(Journal dataManager) {
+ this.journal = dataManager;
+ this.inflightWrites = this.journal.getInflightWrites();
+ }
+
+ /**
+ * @param type
+ * @param marshaller
+ * @param payload
+ * @param type
+ * @param sync
+ * @return
+ * @throws IOException
+ * @throws
+ * @throws
+ */
+ public Location storeItem(Buffer data, byte type, boolean sync) throws IOException {
+
+ // Write the packet our internal buffer.
+ int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+
+ final Location location = new Location();
+ location.setSize(size);
+ location.setType(type);
+
+ WriteBatch batch;
+ WriteCommand write = new WriteCommand(location, data, sync);
+
+ // Locate datafile and enqueue into the executor in sychronized block so
+ // that writes get equeued onto the executor in order that they were
+ // assigned
+ // by the data manager (which is basically just appending)
+
+ synchronized (this) {
+ batch = enqueue(write);
+ }
+ location.setLatch(batch.latch);
+ if (sync) {
+ try {
+ batch.latch.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+
+ return location;
+ }
+
+ public Location storeItem(Buffer data, byte type, Runnable onComplete) throws IOException {
+ // Write the packet our internal buffer.
+ int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+
+ final Location location = new Location();
+ location.setSize(size);
+ location.setType(type);
+
+ WriteBatch batch;
+ WriteCommand write = new WriteCommand(location, data, onComplete);
+
+ synchronized (this) {
+ batch = enqueue(write);
+ }
+
+ location.setLatch(batch.latch);
+ return location;
+ }
+
+ private WriteBatch enqueue(WriteCommand write) throws IOException {
+ synchronized (enqueueMutex) {
+ if (shutdown) {
+ throw new IOException("Async Writter Thread Shutdown");
+ }
+ if (firstAsyncException != null) {
+ throw firstAsyncException;
+ }
+
+ if (!running) {
+ running = true;
+ thread = new Thread() {
+ public void run() {
+ processQueue();
+ }
+ };
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.setDaemon(true);
+ thread.setName("ActiveMQ Data File Writer");
+ thread.start();
+ }
+
+ while ( true ) {
+ if (nextWriteBatch == null) {
+ DataFile file = journal.getCurrentWriteFile();
+ if( file.getLength() > journal.getMaxFileLength() ) {
+ file = journal.rotateWriteFile();
+ }
+
+ nextWriteBatch = new WriteBatch(file, file.getLength(), write);
+ enqueueMutex.notify();
+ break;
+ } else {
+ // Append to current batch if possible..
+ if (nextWriteBatch.canAppend(write)) {
+ nextWriteBatch.append(write);
+ break;
+ } else {
+ // Otherwise wait for the queuedCommand to be null
+ try {
+ while (nextWriteBatch != null) {
+ enqueueMutex.wait();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ if (shutdown) {
+ throw new IOException("Async Writter Thread Shutdown");
+ }
+ }
+ }
+ }
+ if (!write.sync) {
+ inflightWrites.put(new WriteKey(write.location), write);
+ }
+ return nextWriteBatch;
+ }
+ }
+
+ public void close() throws IOException {
+ synchronized (enqueueMutex) {
+ if (!shutdown) {
+ shutdown = true;
+ if (running) {
+ enqueueMutex.notifyAll();
+ } else {
+ shutdownDone.countDown();
+ }
+ }
+ }
+
+ try {
+ shutdownDone.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+
+ }
+
+ /**
+ * The async processing loop that writes to the data files and does the
+ * force calls. Since the file sync() call is the slowest of all the
+ * operations, this algorithm tries to 'batch' or group together several
+ * file sync() requests into a single file sync() call. The batching is
+ * accomplished attaching the same CountDownLatch instance to every force
+ * request in a group.
+ */
+ protected void processQueue() {
+ DataFile dataFile = null;
+ RandomAccessFile file = null;
+ try {
+
+ DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
+ while (true) {
+
+ Object o = null;
+
+ // Block till we get a command.
+ synchronized (enqueueMutex) {
+ while (true) {
+ if (nextWriteBatch != null) {
+ o = nextWriteBatch;
+ nextWriteBatch = null;
+ break;
+ }
+ if (shutdown) {
+ return;
+ }
+ enqueueMutex.wait();
+ }
+ enqueueMutex.notify();
+ }
+
+ WriteBatch wb = (WriteBatch)o;
+ if (dataFile != wb.dataFile) {
+ if (file != null) {
+ file.setLength(dataFile.getLength());
+ dataFile.closeRandomAccessFile(file);
+ }
+ dataFile = wb.dataFile;
+ file = dataFile.openRandomAccessFile();
+ if( file.length() < journal.preferedFileLength ) {
+ file.setLength(journal.preferedFileLength);
+ }
+ }
+
+ WriteCommand write = wb.writes.getHead();
+
+ // Write an empty batch control record.
+ buff.reset();
+ buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
+ buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
+ buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
+ buff.writeInt(0);
+ buff.writeLong(0);
+
+ boolean forceToDisk = false;
+ while (write != null) {
+ forceToDisk |= write.sync | write.onComplete != null;
+ buff.writeInt(write.location.getSize());
+ buff.writeByte(write.location.getType());
+ buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+ write = write.getNext();
+ }
+
+ Buffer sequence = buff.toBuffer();
+
+ // Now we can fill in the batch control record properly.
+ buff.reset();
+ buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
+ buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+ if( journal.isChecksum() ) {
+ Checksum checksum = new Adler32();
+ checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+ buff.writeLong(checksum.getValue());
+ }
+
+ // Now do the 1 big write.
+ file.seek(wb.offset);
+ file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+
+ ReplicationTarget replicationTarget = journal.getReplicationTarget();
+ if( replicationTarget!=null ) {
+ replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
+ }
+
+ if (forceToDisk) {
+ IOHelper.sync(file.getFD());
+ }
+
+ WriteCommand lastWrite = wb.writes.getTail();
+ journal.setLastAppendLocation(lastWrite.location);
+
+ // Now that the data is on disk, remove the writes from the in
+ // flight
+ // cache.
+ write = wb.writes.getHead();
+ while (write != null) {
+ if (!write.sync) {
+ inflightWrites.remove(new WriteKey(write.location));
+ }
+ if (write.onComplete != null) {
+ try {
+ write.onComplete.run();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ write = write.getNext();
+ }
+
+ // Signal any waiting threads that the write is on disk.
+ wb.latch.countDown();
+ }
+ } catch (IOException e) {
+ synchronized (enqueueMutex) {
+ firstAsyncException = e;
+ }
+ } catch (InterruptedException e) {
+ } finally {
+ try {
+ if (file != null) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ } catch (Throwable ignore) {
+ }
+ shutdownDone.countDown();
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Journal.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Journal.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Journal.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.activemq.util.Scheduler;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.DataByteArrayInputStream;
+import org.apache.activemq.util.list.LinkedNodeList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawtdb.internal.journal.DataFileAppender.WriteCommand;
+import org.apache.hawtdb.internal.journal.DataFileAppender.WriteKey;
+
+/**
+ * Manages DataFiles
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Journal {
+
+ private static final int MAX_BATCH_SIZE = 32*1024*1024;
+
+ // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+ public static final int RECORD_HEAD_SPACE = 4 + 1;
+
+ public static final byte USER_RECORD_TYPE = 1;
+ public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
+ // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
+ public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
+ public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
+
+ public static final String DEFAULT_DIRECTORY = ".";
+ public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
+ public static final String DEFAULT_FILE_PREFIX = "db-";
+ public static final String DEFAULT_FILE_SUFFIX = ".log";
+ public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
+ public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
+ public static final int PREFERED_DIFF = 1024 * 512;
+
+ private static final Log LOG = LogFactory.getLog(Journal.class);
+
+ protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
+
+ protected File directory = new File(DEFAULT_DIRECTORY);
+ protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
+ protected String filePrefix = DEFAULT_FILE_PREFIX;
+ protected String fileSuffix = DEFAULT_FILE_SUFFIX;
+ protected boolean started;
+
+ protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
+ protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
+
+ protected DataFileAppender appender;
+ protected DataFileAccessorPool accessorPool;
+
+ protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+ protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
+ protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
+
+ protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
+ protected Runnable cleanupTask;
+ protected final AtomicLong totalLength = new AtomicLong();
+ protected boolean archiveDataLogs;
+ private ReplicationTarget replicationTarget;
+ protected boolean checksum;
+
+ public synchronized void start() throws IOException {
+ if (started) {
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+ accessorPool = new DataFileAccessorPool(this);
+ started = true;
+ preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
+
+ appender = new DataFileAppender(this);
+
+ File[] files = directory.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String n) {
+ return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
+ }
+ });
+
+ if (files != null) {
+ for (int i = 0; i < files.length; i++) {
+ try {
+ File file = files[i];
+ String n = file.getName();
+ String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
+ int num = Integer.parseInt(numStr);
+ DataFile dataFile = new DataFile(file, num, preferedFileLength);
+ fileMap.put(dataFile.getDataFileId(), dataFile);
+ totalLength.addAndGet(dataFile.getLength());
+ } catch (NumberFormatException e) {
+ // Ignore file that do not match the pattern.
+ }
+ }
+
+ // Sort the list so that we can link the DataFiles together in the
+ // right order.
+ List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
+ Collections.sort(l);
+ for (DataFile df : l) {
+ dataFiles.addLast(df);
+ fileByFileMap.put(df.getFile(), df);
+ }
+ }
+
+ getCurrentWriteFile();
+ try {
+ Location l = recoveryCheck(dataFiles.getTail());
+ lastAppendLocation.set(l);
+ } catch (IOException e) {
+ LOG.warn("recovery check failed", e);
+ }
+
+ cleanupTask = new Runnable() {
+ public void run() {
+ cleanup();
+ }
+ };
+ Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
+ long end = System.currentTimeMillis();
+ LOG.trace("Startup took: "+(end-start)+" ms");
+ }
+
+ private static byte[] bytes(String string) {
+ try {
+ return string.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected Location recoveryCheck(DataFile dataFile) throws IOException {
+ byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
+ DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
+
+ Location location = new Location();
+ location.setDataFileId(dataFile.getDataFileId());
+ location.setOffset(0);
+
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ while( true ) {
+ reader.read(location.getOffset(), controlRecord);
+ controlIs.restart();
+
+ // Assert that it's a batch record.
+ if( controlIs.readInt() != BATCH_CONTROL_RECORD_SIZE ) {
+ break;
+ }
+ if( controlIs.readByte() != BATCH_CONTROL_RECORD_TYPE ) {
+ break;
+ }
+ for( int i=0; i < BATCH_CONTROL_RECORD_MAGIC.length; i++ ) {
+ if( controlIs.readByte() != BATCH_CONTROL_RECORD_MAGIC[i] ) {
+ break;
+ }
+ }
+
+ int size = controlIs.readInt();
+ if( size > MAX_BATCH_SIZE ) {
+ break;
+ }
+
+ if( isChecksum() ) {
+
+ long expectedChecksum = controlIs.readLong();
+
+ byte data[] = new byte[size];
+ reader.read(location.getOffset()+BATCH_CONTROL_RECORD_SIZE, data);
+
+ Checksum checksum = new Adler32();
+ checksum.update(data, 0, data.length);
+
+ if( expectedChecksum!=checksum.getValue() ) {
+ break;
+ }
+
+ }
+
+
+ location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+ }
+
+ } catch (IOException e) {
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+
+ dataFile.setLength(location.getOffset());
+ return location;
+ }
+
+ void addToTotalLength(int size) {
+ totalLength.addAndGet(size);
+ }
+
+
+ synchronized DataFile getCurrentWriteFile() throws IOException {
+ if (dataFiles.isEmpty()) {
+ rotateWriteFile();
+ }
+ return dataFiles.getTail();
+ }
+
+ synchronized DataFile rotateWriteFile() {
+ int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+ File file = getFile(nextNum);
+ DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
+ // actually allocate the disk space
+ fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+ fileByFileMap.put(file, nextWriteFile);
+ dataFiles.addLast(nextWriteFile);
+ return nextWriteFile;
+ }
+
+ public File getFile(int nextNum) {
+ String fileName = filePrefix + nextNum + fileSuffix;
+ File file = new File(directory, fileName);
+ return file;
+ }
+
+ synchronized DataFile getDataFile(Location item) throws IOException {
+ Integer key = Integer.valueOf(item.getDataFileId());
+ DataFile dataFile = fileMap.get(key);
+ if (dataFile == null) {
+ LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+ throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
+ }
+ return dataFile;
+ }
+
+ synchronized File getFile(Location item) throws IOException {
+ Integer key = Integer.valueOf(item.getDataFileId());
+ DataFile dataFile = fileMap.get(key);
+ if (dataFile == null) {
+ LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+ throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
+ }
+ return dataFile.getFile();
+ }
+
+ private DataFile getNextDataFile(DataFile dataFile) {
+ return dataFile.getNext();
+ }
+
+ public synchronized void close() throws IOException {
+ if (!started) {
+ return;
+ }
+ Scheduler.cancel(cleanupTask);
+ accessorPool.close();
+ appender.close();
+ fileMap.clear();
+ fileByFileMap.clear();
+ dataFiles.clear();
+ lastAppendLocation.set(null);
+ started = false;
+ }
+
+ synchronized void cleanup() {
+ if (accessorPool != null) {
+ accessorPool.disposeUnused();
+ }
+ }
+
+ public synchronized boolean delete() throws IOException {
+
+ // Close all open file handles...
+ appender.close();
+ accessorPool.close();
+
+ boolean result = true;
+ for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+ DataFile dataFile = i.next();
+ totalLength.addAndGet(-dataFile.getLength());
+ result &= dataFile.delete();
+ }
+ fileMap.clear();
+ fileByFileMap.clear();
+ lastAppendLocation.set(null);
+ dataFiles = new LinkedNodeList<DataFile>();
+
+ // reopen open file handles...
+ accessorPool = new DataFileAccessorPool(this);
+ appender = new DataFileAppender(this);
+ return result;
+ }
+
+ public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
+ for (Integer key : files) {
+ // Can't remove the data file (or subsequent files) that is currently being written to.
+ if( key >= lastAppendLocation.get().getDataFileId() ) {
+ continue;
+ }
+ DataFile dataFile = fileMap.get(key);
+ if( dataFile!=null ) {
+ forceRemoveDataFile(dataFile);
+ }
+ }
+ }
+
+ private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
+ accessorPool.disposeDataFileAccessors(dataFile);
+ fileByFileMap.remove(dataFile.getFile());
+ fileMap.remove(dataFile.getDataFileId());
+ totalLength.addAndGet(-dataFile.getLength());
+ dataFile.unlink();
+ if (archiveDataLogs) {
+ dataFile.move(getDirectoryArchive());
+ LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
+ } else {
+ if ( dataFile.delete() ) {
+ LOG.debug("Discarded data file " + dataFile);
+ } else {
+ LOG.warn("Failed to discard data file " + dataFile.getFile());
+ }
+ }
+ }
+
+ /**
+ * @return the maxFileLength
+ */
+ public int getMaxFileLength() {
+ return maxFileLength;
+ }
+
+ /**
+ * @param maxFileLength the maxFileLength to set
+ */
+ public void setMaxFileLength(int maxFileLength) {
+ this.maxFileLength = maxFileLength;
+ }
+
+ public String toString() {
+ return directory.toString();
+ }
+
+ public synchronized void appendedExternally(Location loc, int length) throws IOException {
+ DataFile dataFile = null;
+ if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
+ // It's an update to the current log file..
+ dataFile = dataFiles.getTail();
+ dataFile.incrementLength(length);
+ } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
+ // It's an update to the next log file.
+ int nextNum = loc.getDataFileId();
+ File file = getFile(nextNum);
+ dataFile = new DataFile(file, nextNum, preferedFileLength);
+ // actually allocate the disk space
+ fileMap.put(dataFile.getDataFileId(), dataFile);
+ fileByFileMap.put(file, dataFile);
+ dataFiles.addLast(dataFile);
+ } else {
+ throw new IOException("Invalid external append.");
+ }
+ }
+
+ public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
+
+ Location cur = null;
+ while (true) {
+ if (cur == null) {
+ if (location == null) {
+ DataFile head = dataFiles.getHead();
+ if( head == null ) {
+ return null;
+ }
+ cur = new Location();
+ cur.setDataFileId(head.getDataFileId());
+ cur.setOffset(0);
+ } else {
+ // Set to the next offset..
+ if (location.getSize() == -1) {
+ cur = new Location(location);
+ } else {
+ cur = new Location(location);
+ cur.setOffset(location.getOffset() + location.getSize());
+ }
+ }
+ } else {
+ cur.setOffset(cur.getOffset() + cur.getSize());
+ }
+
+ DataFile dataFile = getDataFile(cur);
+
+ // Did it go into the next file??
+ if (dataFile.getLength() <= cur.getOffset()) {
+ dataFile = getNextDataFile(dataFile);
+ if (dataFile == null) {
+ return null;
+ } else {
+ cur.setDataFileId(dataFile.getDataFileId().intValue());
+ cur.setOffset(0);
+ }
+ }
+
+ // Load in location size and type.
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ reader.readLocationDetails(cur);
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+
+ if (cur.getType() == 0) {
+ return null;
+ } else if (cur.getType() == USER_RECORD_TYPE) {
+ // Only return user records.
+ return cur;
+ }
+ }
+ }
+
+ public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
+ DataFile df = fileByFileMap.get(file);
+ return getNextLocation(df, lastLocation, thisFileOnly);
+ }
+
+ public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
+
+ Location cur = null;
+ while (true) {
+ if (cur == null) {
+ if (lastLocation == null) {
+ DataFile head = dataFile.getHeadNode();
+ cur = new Location();
+ cur.setDataFileId(head.getDataFileId());
+ cur.setOffset(0);
+ } else {
+ // Set to the next offset..
+ cur = new Location(lastLocation);
+ cur.setOffset(cur.getOffset() + cur.getSize());
+ }
+ } else {
+ cur.setOffset(cur.getOffset() + cur.getSize());
+ }
+
+ // Did it go into the next file??
+ if (dataFile.getLength() <= cur.getOffset()) {
+ if (thisFileOnly) {
+ return null;
+ } else {
+ dataFile = getNextDataFile(dataFile);
+ if (dataFile == null) {
+ return null;
+ } else {
+ cur.setDataFileId(dataFile.getDataFileId().intValue());
+ cur.setOffset(0);
+ }
+ }
+ }
+
+ // Load in location size and type.
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ reader.readLocationDetails(cur);
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+
+ if (cur.getType() == 0) {
+ return null;
+ } else if (cur.getType() > 0) {
+ // Only return user records.
+ return cur;
+ }
+ }
+ }
+
+ public synchronized Buffer read(Location location) throws IOException, IllegalStateException {
+ DataFile dataFile = getDataFile(location);
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ Buffer rc = null;
+ try {
+ rc = reader.readRecord(location);
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+ return rc;
+ }
+
+ public synchronized Location write(Buffer data, boolean sync) throws IOException, IllegalStateException {
+ Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
+ return loc;
+ }
+
+ public synchronized Location write(Buffer data, Runnable onComplete) throws IOException, IllegalStateException {
+ Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
+ return loc;
+ }
+
+ public void update(Location location, Buffer data, boolean sync) throws IOException {
+ DataFile dataFile = getDataFile(location);
+ DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ updater.updateRecord(location, data, sync);
+ } finally {
+ accessorPool.closeDataFileAccessor(updater);
+ }
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public String getFilePrefix() {
+ return filePrefix;
+ }
+
+ public void setFilePrefix(String filePrefix) {
+ this.filePrefix = filePrefix;
+ }
+
+ public Map<WriteKey, WriteCommand> getInflightWrites() {
+ return inflightWrites;
+ }
+
+ public Location getLastAppendLocation() {
+ return lastAppendLocation.get();
+ }
+
+ public void setLastAppendLocation(Location lastSyncedLocation) {
+ this.lastAppendLocation.set(lastSyncedLocation);
+ }
+
+ public File getDirectoryArchive() {
+ return directoryArchive;
+ }
+
+ public void setDirectoryArchive(File directoryArchive) {
+ this.directoryArchive = directoryArchive;
+ }
+
+ public boolean isArchiveDataLogs() {
+ return archiveDataLogs;
+ }
+
+ public void setArchiveDataLogs(boolean archiveDataLogs) {
+ this.archiveDataLogs = archiveDataLogs;
+ }
+
+ synchronized public Integer getCurrentDataFileId() {
+ if (dataFiles.isEmpty())
+ return null;
+ return dataFiles.getTail().getDataFileId();
+ }
+
+ /**
+ * Get a set of files - only valid after start()
+ *
+ * @return files currently being used
+ */
+ public Set<File> getFiles() {
+ return fileByFileMap.keySet();
+ }
+
+ public Map<Integer, DataFile> getFileMap() {
+ return new TreeMap<Integer, DataFile>(fileMap);
+ }
+
+ public long getDiskSize() {
+ long tailLength=0;
+ synchronized( this ) {
+ if( !dataFiles.isEmpty() ) {
+ tailLength = dataFiles.getTail().getLength();
+ }
+ }
+
+ long rc = totalLength.get();
+
+ // The last file is actually at a minimum preferedFileLength big.
+ if( tailLength < preferedFileLength ) {
+ rc -= tailLength;
+ rc += preferedFileLength;
+ }
+ return rc;
+ }
+
+ public void setReplicationTarget(ReplicationTarget replicationTarget) {
+ this.replicationTarget = replicationTarget;
+ }
+ public ReplicationTarget getReplicationTarget() {
+ return replicationTarget;
+ }
+
+ public String getFileSuffix() {
+ return fileSuffix;
+ }
+
+ public void setFileSuffix(String fileSuffix) {
+ this.fileSuffix = fileSuffix;
+ }
+
+ public boolean isChecksum() {
+ return checksum;
+ }
+
+ public void setChecksum(boolean checksumWrites) {
+ this.checksum = checksumWrites;
+ }
+
+
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Used as a location in the data store.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public final class Location implements Comparable<Location> {
+
+ public static final byte USER_TYPE = 1;
+ public static final byte NOT_SET_TYPE = 0;
+ public static final int NOT_SET = -1;
+
+ private int dataFileId = NOT_SET;
+ private int offset = NOT_SET;
+ private int size = NOT_SET;
+ private byte type = NOT_SET_TYPE;
+ private CountDownLatch latch;
+
+ public Location() {
+ }
+
+ public Location(Location item) {
+ this.dataFileId = item.dataFileId;
+ this.offset = item.offset;
+ this.size = item.size;
+ this.type = item.type;
+ }
+
+ public Location(int dataFileId, int offset) {
+ this.dataFileId=dataFileId;
+ this.offset=offset;
+ }
+
+ boolean isValid() {
+ return dataFileId != NOT_SET;
+ }
+
+ /**
+ * @return the size of the data record including the header.
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * @param size the size of the data record including the header.
+ */
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
+
+ public int getDataFileId() {
+ return dataFileId;
+ }
+
+ public void setDataFileId(int file) {
+ this.dataFileId = file;
+ }
+
+ public byte getType() {
+ return type;
+ }
+
+ public void setType(byte type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return dataFileId+":"+offset;
+ }
+
+ public void writeExternal(DataOutput dos) throws IOException {
+ dos.writeInt(dataFileId);
+ dos.writeInt(offset);
+ dos.writeInt(size);
+ dos.writeByte(type);
+ }
+
+ public void readExternal(DataInput dis) throws IOException {
+ dataFileId = dis.readInt();
+ offset = dis.readInt();
+ size = dis.readInt();
+ type = dis.readByte();
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public int compareTo(Location o) {
+ Location l = (Location)o;
+ if (dataFileId == l.dataFileId) {
+ int rc = offset - l.offset;
+ return rc;
+ }
+ return dataFileId - l.dataFileId;
+ }
+
+ public boolean equals(Object o) {
+ boolean result = false;
+ if (o instanceof Location) {
+ result = compareTo((Location)o) == 0;
+ }
+ return result;
+ }
+
+ public int hashCode() {
+ return dataFileId ^ offset;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/Location.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyDataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyDataFile.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyDataFile.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyDataFile.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/**
+ * Allows you to open a data file in read only mode. Useful when working with
+ * archived data files.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ReadOnlyDataFile extends DataFile {
+
+ ReadOnlyDataFile(File file, int number, int preferedSize) {
+ super(file, number, preferedSize);
+ }
+
+ public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+ return new RandomAccessFile(file, "r");
+ }
+
+ public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+ file.close();
+ }
+
+ public synchronized boolean delete() throws IOException {
+ throw new RuntimeException("Not valid on a read only file.");
+ }
+
+ public synchronized void move(File targetDirectory) throws IOException{
+ throw new RuntimeException("Not valid on a read only file.");
+ }
+
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyJournal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyJournal.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyJournal.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReadOnlyJournal.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * An AsyncDataManager that works in read only mode against multiple data directories.
+ * Useful for reading back archived data files.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ReadOnlyJournal extends Journal {
+
+ private final ArrayList<File> dirs;
+
+ public ReadOnlyJournal(final ArrayList<File> dirs) {
+ this.dirs = dirs;
+ }
+
+ public synchronized void start() throws IOException {
+ if (started) {
+ return;
+ }
+
+ started = true;
+
+ ArrayList<File> files = new ArrayList<File>();
+ for (File directory : dirs) {
+ final File d = directory;
+ File[] f = d.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String n) {
+ return dir.equals(d) && n.startsWith(filePrefix);
+ }
+ });
+ for (int i = 0; i < f.length; i++) {
+ files.add(f[i]);
+ }
+ }
+
+ for (File file : files) {
+ try {
+ String n = file.getName();
+ String numStr = n.substring(filePrefix.length(), n.length());
+ int num = Integer.parseInt(numStr);
+ DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
+ fileMap.put(dataFile.getDataFileId(), dataFile);
+ totalLength.addAndGet(dataFile.getLength());
+ } catch (NumberFormatException e) {
+ // Ignore file that do not match the pattern.
+ }
+ }
+
+ // Sort the list so that we can link the DataFiles together in the
+ // right order.
+ List<DataFile> list = new ArrayList<DataFile>(fileMap.values());
+ Collections.sort(list);
+ for (DataFile df : list) {
+ dataFiles.addLast(df);
+ fileByFileMap.put(df.getFile(), df);
+ }
+
+// // Need to check the current Write File to see if there was a partial
+// // write to it.
+// if (!dataFiles.isEmpty()) {
+//
+// // See if the lastSyncedLocation is valid..
+// Location l = lastAppendLocation.get();
+// if (l != null && l.getDataFileId() != dataFiles.getTail().getDataFileId().intValue()) {
+// l = null;
+// }
+//
+// // If we know the last location that was ok.. then we can skip lots
+// // of checking
+// try {
+// l = recoveryCheck(dataFiles.getTail(), l);
+// lastAppendLocation.set(l);
+// } catch (IOException e) {
+// LOG.warn("recovery check failed", e);
+// }
+// }
+ }
+
+ public synchronized void close() throws IOException {
+ if (!started) {
+ return;
+ }
+ accessorPool.close();
+ fileMap.clear();
+ fileByFileMap.clear();
+ started = false;
+ }
+
+
+ public Location getFirstLocation() throws IllegalStateException, IOException {
+ if( dataFiles.isEmpty() ) {
+ return null;
+ }
+
+ DataFile first = dataFiles.getHead();
+ Location cur = new Location();
+ cur.setDataFileId(first.getDataFileId());
+ cur.setOffset(0);
+ cur.setSize(0);
+ return getNextLocation(cur);
+ }
+
+ @Override
+ public synchronized boolean delete() throws IOException {
+ throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
+ }
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReplicationTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReplicationTarget.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReplicationTarget.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/ReplicationTarget.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface ReplicationTarget {
+
+ void replicate(Location location, Buffer sequence, boolean sync);
+
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html Thu Oct 15 17:04:11 2009
@@ -0,0 +1,23 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+journal based data storage - scalable and fast
+</body>
+</html>
Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/package.html
------------------------------------------------------------------------------
svn:executable = *
|