Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 82809 invoked from network); 3 Feb 2008 20:43:45 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 3 Feb 2008 20:43:45 -0000 Received: (qmail 92774 invoked by uid 500); 3 Feb 2008 20:43:37 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 92748 invoked by uid 500); 3 Feb 2008 20:43:37 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 92739 invoked by uid 99); 3 Feb 2008 20:43:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Feb 2008 12:43:37 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Feb 2008 20:43:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E12391A9832; Sun, 3 Feb 2008 12:43:20 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r618082 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async: AsyncDataManager.java DataFile.java ReadOnlyAsyncDataManager.java ReadOnlyDataFile.java Date: Sun, 03 Feb 2008 20:43:20 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080203204320.E12391A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Sun Feb 3 12:43:17 2008 New Revision: 618082 URL: http://svn.apache.org/viewvc?rev=618082&view=rev Log: Adding a ReadOnlyAsyncDataManager so that you can access a set of data files in a read only way. Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=618082&r1=618081&r2=618082&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Sun Feb 3 12:43:17 2008 @@ -50,7 +50,7 @@ * * @version $Revision: 1.1.1.1 $ */ -public final class AsyncDataManager { +public class AsyncDataManager { public static final int CONTROL_RECORD_MAX_LENGTH = 1024; public static final int ITEM_HEAD_RESERVED_SPACE = 21; @@ -75,28 +75,28 @@ protected final Map inflightWrites = new ConcurrentHashMap(); - File directory = new File(DEFAULT_DIRECTORY); - File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY); - String filePrefix = DEFAULT_FILE_PREFIX; - ControlFile controlFile; - boolean started; - boolean useNio = true; - - private int maxFileLength = DEFAULT_MAX_FILE_LENGTH; - private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512; - - private DataFileAppender appender; - private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this); - - private Map fileMap = new HashMap(); - private Map fileByFileMap = new LinkedHashMap(); - private DataFile currentWriteFile; - - private Location mark; - private final AtomicReference lastAppendLocation = new AtomicReference(); - private Runnable cleanupTask; - private final AtomicLong storeSize; - private boolean archiveDataLogs; + protected File directory = new File(DEFAULT_DIRECTORY); + protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY); + protected String filePrefix = DEFAULT_FILE_PREFIX; + protected ControlFile controlFile; + protected boolean started; + protected boolean useNio = true; + + protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; + protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512; + + protected DataFileAppender appender; + protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this); + + protected Map fileMap = new HashMap(); + protected Map fileByFileMap = new LinkedHashMap(); + protected DataFile currentWriteFile; + + protected Location mark; + protected final AtomicReference lastAppendLocation = new AtomicReference(); + protected Runnable cleanupTask; + protected final AtomicLong storeSize; + protected boolean archiveDataLogs; public AsyncDataManager(AtomicLong storeSize) { this.storeSize=storeSize; @@ -194,7 +194,7 @@ Scheduler.executePeriodically(cleanupTask, 1000 * 30); } - private Location recoveryCheck(DataFile dataFile, Location location) throws IOException { + protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException { if (location == null) { location = new Location(); location.setDataFileId(dataFile.getDataFileId()); @@ -213,7 +213,7 @@ return location; } - private void unmarshallState(ByteSequence sequence) throws IOException { + protected void unmarshallState(ByteSequence sequence) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength()); DataInputStream dis = new DataInputStream(bais); if (dis.readBoolean()) { @@ -596,7 +596,7 @@ storeState(sync); } - private synchronized void storeState(boolean sync) throws IOException { + protected synchronized void storeState(boolean sync) throws IOException { ByteSequence state = marshallState(); appender.storeItem(state, Location.MARK_TYPE, sync); controlFile.store(state, sync); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?rev=618082&r1=618081&r2=618082&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Sun Feb 3 12:43:17 2008 @@ -28,14 +28,14 @@ * * @version $Revision: 1.1.1.1 $ */ -class DataFile extends LinkedNode implements Comparable { +public class DataFile extends LinkedNode implements Comparable { - private final File file; - private final Integer dataFileId; - private final int preferedSize; + protected final File file; + protected final Integer dataFileId; + protected final int preferedSize; - private int length; - private int referenceCount; + protected int length; + protected int referenceCount; DataFile(File file, int number, int preferedSize) { this.file = file; Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java?rev=618082&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java Sun Feb 3 12:43:17 2008 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.activemq.thread.Scheduler; +import org.apache.activemq.util.ByteSequence; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * An AsyncDataManager that works in read only mode against multiple data directories. + * Useful for reading back archived data files. + */ +public class ReadOnlyAsyncDataManager extends AsyncDataManager { + + private static final Log LOG = LogFactory.getLog(ReadOnlyAsyncDataManager.class); + private final ArrayList dirs; + + public ReadOnlyAsyncDataManager(final ArrayList dirs) { + this.dirs = dirs; + } + + @SuppressWarnings("unchecked") + public synchronized void start() throws IOException { + if (started) { + return; + } + + started = true; + + ArrayList files = new ArrayList(); + for (File directory : dirs) { + final File d = directory; + File[] f = d.listFiles(new FilenameFilter() { + public boolean accept(File dir, String n) { + return dir.equals(d) && n.startsWith(filePrefix); + } + }); + for (int i = 0; i < f.length; i++) { + files.add(f[i]); + } + } + + for (File file : files) { + try { + String n = file.getName(); + String numStr = n.substring(filePrefix.length(), n.length()); + int num = Integer.parseInt(numStr); + DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength); + fileMap.put(dataFile.getDataFileId(), dataFile); + storeSize.addAndGet(dataFile.getLength()); + } catch (NumberFormatException e) { + // Ignore file that do not match the pattern. + } + } + + // Sort the list so that we can link the DataFiles together in the + // right order. + List dataFiles = new ArrayList(fileMap.values()); + Collections.sort(dataFiles); + currentWriteFile = null; + for (DataFile df : dataFiles) { + if (currentWriteFile != null) { + currentWriteFile.linkAfter(df); + } + currentWriteFile = df; + fileByFileMap.put(df.getFile(), df); + } + + // Need to check the current Write File to see if there was a partial + // write to it. + if (currentWriteFile != null) { + + // See if the lastSyncedLocation is valid.. + Location l = lastAppendLocation.get(); + if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) { + l = null; + } + } + } + + public synchronized void close() throws IOException { + if (!started) { + return; + } + accessorPool.close(); + fileMap.clear(); + fileByFileMap.clear(); + started = false; + } + + + public Location getFirstLocation() throws IllegalStateException, IOException { + if( currentWriteFile == null ) { + return null; + } + + DataFile first = (DataFile)currentWriteFile.getHeadNode(); + Location cur = new Location(); + cur.setDataFileId(first.getDataFileId()); + cur.setOffset(0); + cur.setSize(0); + return getNextLocation(cur); + } + + @Override + public synchronized boolean delete() throws IOException { + throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager"); + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java?rev=618082&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java Sun Feb 3 12:43:17 2008 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.activemq.util.IOHelper; + +/** + * Allows you to open a data file in read only mode. Useful when working with + * archived data files. + */ +public class ReadOnlyDataFile extends DataFile { + + ReadOnlyDataFile(File file, int number, int preferedSize) { + super(file, number, preferedSize); + } + + + public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException { + RandomAccessFile rc = new RandomAccessFile(file, "r"); + // When we start to write files size them up so that the OS has a chance + // to allocate the file contigously. + if (appender) { + if (length < preferedSize) { + rc.setLength(preferedSize); + } + } + return rc; + } + + public void closeRandomAccessFile(RandomAccessFile file) throws IOException { + file.close(); + } + + public synchronized boolean delete() throws IOException { + throw new RuntimeException("Not valid on a read only file."); + } + + public synchronized void move(File targetDirectory) throws IOException{ + throw new RuntimeException("Not valid on a read only file."); + } + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java ------------------------------------------------------------------------------ svn:keywords = Rev Date