Return-Path: X-Original-To: apmail-jackrabbit-commits-archive@www.apache.org Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6D0D910356 for ; Wed, 11 Sep 2013 14:35:38 +0000 (UTC) Received: (qmail 17602 invoked by uid 500); 11 Sep 2013 14:35:38 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 17443 invoked by uid 500); 11 Sep 2013 14:35:38 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 17432 invoked by uid 99); 11 Sep 2013 14:35:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Sep 2013 14:35:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_STOCK2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Sep 2013 14:35:27 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 389F82388980; Wed, 11 Sep 2013 14:35:04 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1521876 [1/2] - in /jackrabbit/trunk/jackrabbit-aws-ext: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/jackrabbit/ src/main/java/org/apache/jackrabbit/aws/ src/main/java/org/apache/j... Date: Wed, 11 Sep 2013 14:35:03 -0000 To: commits@jackrabbit.apache.org From: thomasm@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130911143504.389F82388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: thomasm Date: Wed Sep 11 14:35:02 2013 New Revision: 1521876 URL: http://svn.apache.org/r1521876 Log: JCR-3651 S3 Datastore implementation (initial commit) Added: jackrabbit/trunk/jackrabbit-aws-ext/ (with props) jackrabbit/trunk/jackrabbit-aws-ext/README.txt jackrabbit/trunk/jackrabbit-aws-ext/pom.xml jackrabbit/trunk/jackrabbit-aws-ext/src/ jackrabbit/trunk/jackrabbit-aws-ext/src/main/ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestLocalCache.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryBackend.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryDataStore.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestCaseBase.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDs.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/ jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/aws.properties jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/log4j.properties jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml Propchange: jackrabbit/trunk/jackrabbit-aws-ext/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Wed Sep 11 14:35:02 2013 @@ -0,0 +1,10 @@ +target +*.iws +*.ipr +*.iml +junit*.properties +.* +*.xml.md5 +*-pom-snapshot-version +.checkstyle + Added: jackrabbit/trunk/jackrabbit-aws-ext/README.txt URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/README.txt?rev=1521876&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-aws-ext/README.txt (added) +++ jackrabbit/trunk/jackrabbit-aws-ext/README.txt Wed Sep 11 14:35:02 2013 @@ -0,0 +1,28 @@ +==================================================== +Welcome to Jackrabbit Amazon WebServices Extension +==================================================== + +This is the Amazon Webservices Extension component of the Apache Jackrabbit project. +This component contains S3 Datastore which stores binaries on Amazon S3 (http://aws.amazon.com/s3). + +==================================================== +Build Instructions +==================================================== +To build the latest SNAPSHOT versions of all the components +included here, run the following command with Maven 3: + + mvn clean install + +To run testcases which stores in S3 bucket, please pass aws config file via system property. For e.g. + + mvn clean install -DargLine="-Dconfig=/opt/cq/aws.properties" + +Sample aws properties located at src/test/resources/aws.properties + +==================================================== +Configuration Instructions +==================================================== +It require to configure aws.properties to configure S3 Datastore. + + + Added: jackrabbit/trunk/jackrabbit-aws-ext/pom.xml URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/pom.xml?rev=1521876&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-aws-ext/pom.xml (added) +++ jackrabbit/trunk/jackrabbit-aws-ext/pom.xml Wed Sep 11 14:35:02 2013 @@ -0,0 +1,90 @@ + + + + + + 4.0.0 + + + + + + org.apache.jackrabbit + jackrabbit-parent + 2.8-SNAPSHOT + ../jackrabbit-parent/pom.xml + + jackrabbit-aws-ext + Jackrabbit AWS Extension + Jackrabbit extenstion to Amazon Webservices + + + + + + + javax.jcr + jcr + + + org.apache.jackrabbit + jackrabbit-jcr-commons + ${project.version} + + + com.amazonaws + aws-java-sdk + 1.5.6 + + + org.apache.jackrabbit + jackrabbit-core + ${project.version} + + + org.slf4j + slf4j-api + 1.7.5 + + + + junit + junit + test + + + org.slf4j + slf4j-log4j12 + 1.7.5 + test + + + org.apache.jackrabbit + jackrabbit-core + ${project.version} + test-jar + + + + + + maven-surefire-plugin + + + **/aws/**/TestAll.java + + + + + + Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java?rev=1521876&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java (added) +++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java Wed Sep 11 14:35:02 2013 @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.aws.ext; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +import javax.jcr.RepositoryException; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.core.data.LazyFileInputStream; +import org.apache.jackrabbit.util.TransientFileFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements a LRU cache used by {@link CachingDataStore}. If cache + * size exceeds limit, this cache goes in purge mode. In purge mode any + * operation to cache is no-op. After purge cache size would be less than + * cachePurgeResizeFactor * maximum size. + */ +public class LocalCache { + + /** + * Logger instance. + */ + static final Logger LOG = LoggerFactory.getLogger(LocalCache.class); + + /** + * The file names of the files that need to be deleted. + */ + final Set toBeDeleted = new HashSet(); + + /** + * The filename Vs file size LRU cache. + */ + LRUCache cache; + + /** + * The directory where the files are created. + */ + private final File directory; + + /** + * The directory where tmp files are created. + */ + private final File tmp; + + /** + * The maximum size of cache in bytes. + */ + private long maxSize; + + /** + * If true cache is in purgeMode and not available. All operation would be + * no-op. + */ + private volatile boolean purgeMode; + + /** + * Build LRU cache of files located at 'path'. It uses lastModified property + * of file to build LRU cache. If cache size exceeds limit size, this cache + * goes in purge mode. In purge mode any operation to cache is no-op. + * + * @param path file system path + * @param tmpPath temporary directory used by cache. + * @param maxSize maximum size of cache. + * @param cachePurgeTrigFactor factor which triggers cache to purge mode. + * That is if current size exceed (cachePurgeTrigFactor * maxSize), the + * cache will go in auto-purge mode. + * @param cachePurgeResizeFactor after cache purge size of cache will be + * just less (cachePurgeResizeFactor * maxSize). + * @throws RepositoryException + */ + public LocalCache(final String path, final String tmpPath, + final long maxSize, final double cachePurgeTrigFactor, + final double cachePurgeResizeFactor) throws RepositoryException { + this.maxSize = maxSize; + directory = new File(path); + tmp = new File(tmpPath); + cache = new LRUCache(maxSize, cachePurgeTrigFactor, + cachePurgeResizeFactor); + ArrayList allFiles = new ArrayList(); + + Iterator it = FileUtils.iterateFiles(directory, null, true); + while (it.hasNext()) { + File f = it.next(); + allFiles.add(f); + } + Collections.sort(allFiles, new Comparator() { + @Override + public int compare(final File o1, final File o2) { + long l1 = o1.lastModified(), l2 = o2.lastModified(); + return l1 < l2 ? -1 : l1 > l2 ? 1 : 0; + } + }); + String dataStorePath = directory.getAbsolutePath(); + long time = System.currentTimeMillis(); + int count = 0; + int deletecount = 0; + for (File f : allFiles) { + if (f.exists()) { + long length = f.length(); + String name = f.getPath(); + if (name.startsWith(dataStorePath)) { + name = name.substring(dataStorePath.length()); + } + // convert to java path format + name = name.replace("\\", "/"); + if (name.startsWith("/") || name.startsWith("\\")) { + name = name.substring(1); + } + if ((cache.currentSizeInBytes + length) < cache.maxSizeInBytes) { + count++; + cache.put(name, length); + } else { + if (tryDelete(name)) { + deletecount++; + } + } + long now = System.currentTimeMillis(); + if (now > time + 5000) { + LOG.info("Processed {" + (count + deletecount) + "}/{" + + allFiles.size() + "}"); + time = now; + } + } + } + LOG.info("Cached {" + count + "}/{" + allFiles.size() + + "} , currentSizeInBytes = " + cache.currentSizeInBytes); + LOG.info("Deleted {" + deletecount + "}/{" + allFiles.size() + + "} files ."); + } + + /** + * Store an item in the cache and return the input stream. If cache is in + * purgeMode or file doesn't exists, inputstream from a + * {@link TransientFileFactory#createTransientFile(String, String, File)} is + * returned. Otherwise inputStream from cached file is returned. This method + * doesn't close the incoming inputstream. + * + * @param fileName the key of cache. + * @param in the inputstream. + * @return the (new) input stream. + */ + public synchronized InputStream store(String fileName, final InputStream in) + throws IOException { + fileName = fileName.replace("\\", "/"); + File f = getFile(fileName); + long length = 0; + if (!f.exists() || isInPurgeMode()) { + OutputStream out = null; + File transFile = null; + try { + TransientFileFactory tff = TransientFileFactory.getInstance(); + transFile = tff.createTransientFile("s3-", "tmp", tmp); + out = new BufferedOutputStream(new FileOutputStream(transFile)); + length = IOUtils.copyLarge(in, out); + } finally { + IOUtils.closeQuietly(out); + } + // rename the file to local fs cache + if (canAdmitFile(length) + && (f.getParentFile().exists() || f.getParentFile().mkdirs()) + && transFile.renameTo(f) && f.exists()) { + if (transFile.exists() && transFile.delete()) { + LOG.warn("tmp file = " + transFile.getAbsolutePath() + + " not deleted successfully"); + } + transFile = null; + toBeDeleted.remove(fileName); + if (cache.get(fileName) == null) { + cache.put(fileName, f.length()); + } + } else { + f = transFile; + } + } else { + // f.exists and not in purge mode + f.setLastModified(System.currentTimeMillis()); + toBeDeleted.remove(fileName); + if (cache.get(fileName) == null) { + cache.put(fileName, f.length()); + } + } + cache.tryPurge(); + return new LazyFileInputStream(f); + } + + /** + * Store an item along with file in cache. Cache size is increased by + * {@link File#length()} If file already exists in cache, + * {@link File#setLastModified(long)} is updated with current time. + * + * @param fileName the key of cache. + * @param src file to be added to cache. + * @throws IOException + */ + public synchronized void store(String fileName, final File src) + throws IOException { + fileName = fileName.replace("\\", "/"); + File dest = getFile(fileName); + File parent = dest.getParentFile(); + if (src.exists() && !dest.exists() && !src.equals(dest) + && canAdmitFile(src.length()) + && (parent.exists() || parent.mkdirs()) && (src.renameTo(dest))) { + toBeDeleted.remove(fileName); + if (cache.get(fileName) == null) { + cache.put(fileName, dest.length()); + } + + } else if (dest.exists()) { + dest.setLastModified(System.currentTimeMillis()); + toBeDeleted.remove(fileName); + if (cache.get(fileName) == null) { + cache.put(fileName, dest.length()); + } + } + cache.tryPurge(); + } + + /** + * Return the inputstream from from cache, or null if not in the cache. + * + * @param fileName name of file. + * @return stream or null. + */ + public InputStream getIfStored(String fileName) throws IOException { + + fileName = fileName.replace("\\", "/"); + File f = getFile(fileName); + synchronized (this) { + if (!f.exists() || isInPurgeMode()) { + log("purgeMode true or file doesn't exists: getIfStored returned"); + return null; + } + f.setLastModified(System.currentTimeMillis()); + return new LazyFileInputStream(f); + } + } + + /** + * Delete file from cache. Size of cache is reduced by file length. The + * method is no-op if file doesn't exist in cache. + * + * @param fileName file name that need to be removed from cache. + */ + public synchronized void delete(String fileName) { + if (isInPurgeMode()) { + log("purgeMode true :delete returned"); + return; + } + fileName = fileName.replace("\\", "/"); + cache.remove(fileName); + } + + /** + * Returns length of file if exists in cache else returns null. + * @param fileName name of the file. + */ + public Long getFileLength(String fileName) { + fileName = fileName.replace("\\", "/"); + File f = getFile(fileName); + synchronized (this) { + if (!f.exists() || isInPurgeMode()) { + log("purgeMode true or file doesn't exists: getFileLength returned"); + return null; + } + f.setLastModified(System.currentTimeMillis()); + return f.length(); + } + } + + /** + * Close the cache. Cache maintain set of files which it was not able to + * delete successfully. This method will an attempt to delete all + * unsuccessful delete files. + */ + public void close() { + log("close"); + deleteOldFiles(); + } + + /** + * Check if cache can admit file of given length. + * @param length of the file. + * @return true if yes else return false. + */ + private synchronized boolean canAdmitFile(final long length) { + // order is important here + boolean value = !isInPurgeMode() && cache.canAdmitFile(length); + if (!value) { + log("cannot admit file of length=" + length + + " and currentSizeInBytes=" + cache.currentSizeInBytes); + } + return value; + } + + /** + * Return true if cache is in purge mode else return false. + */ + synchronized boolean isInPurgeMode() { + return purgeMode || maxSize == 0; + } + + /** + * Set purge mode. If set to true all cache operation will be no-op. If set + * to false, all operations to cache are available. + * + * @param purgeMode purge mode + */ + synchronized void setPurgeMode(final boolean purgeMode) { + this.purgeMode = purgeMode; + } + + File getFile(final String fileName) { + return new File(directory, fileName); + } + + private void deleteOldFiles() { + int initialSize = toBeDeleted.size(); + int count = 0; + for (String n : new ArrayList(toBeDeleted)) { + if (tryDelete(n)) { + count++; + } + } + LOG.info("deleted [" + count + "]/[" + initialSize + "] files"); + } + + /** + * This method tries to delete a file. If it is not able to delete file due + * to any reason, it add it toBeDeleted list. + * + * @param fileName name of the file which will be deleted. + * @return true if this method deletes file successfuly else return false. + */ + boolean tryDelete(final String fileName) { + log("cache delete " + fileName); + File f = getFile(fileName); + if (f.exists() && f.delete()) { + log(fileName + " deleted successfully"); + toBeDeleted.remove(fileName); + while (true) { + f = f.getParentFile(); + if (f.equals(directory) || f.list().length > 0) { + break; + } + // delete empty parent folders (except the main directory) + f.delete(); + } + return true; + } else if (f.exists()) { + LOG.info("not able to delete file = " + f.getAbsolutePath()); + toBeDeleted.add(fileName); + return false; + } + return true; + } + + static int maxSizeElements(final long bytes) { + // after a CQ installation, the average item in + // the data store is about 52 KB + int count = (int) (bytes / 65535); + count = Math.max(1024, count); + count = Math.min(64 * 1024, count); + return count; + } + + static void log(final String s) { + LOG.debug(s); + } + + /** + * A LRU based extension {@link LinkedHashMap}. The key is file name and + * value is length of file. + */ + private class LRUCache extends LinkedHashMap { + private static final long serialVersionUID = 1L; + + volatile long currentSizeInBytes; + + final long maxSizeInBytes; + + long cachePurgeResize; + + private long cachePurgeTrigSize; + + public LRUCache(final long maxSizeInBytes, + final double cachePurgeTrigFactor, + final double cachePurgeResizeFactor) { + super(maxSizeElements(maxSizeInBytes), (float) 0.75, true); + this.maxSizeInBytes = maxSizeInBytes; + this.cachePurgeTrigSize = new Double(cachePurgeTrigFactor + * maxSizeInBytes).longValue(); + this.cachePurgeResize = new Double(cachePurgeResizeFactor + * maxSizeInBytes).longValue(); + } + + /** + * Overridden {@link Map#remove(Object)} to delete corresponding file + * from file system. + */ + @Override + public synchronized Long remove(final Object key) { + String fileName = (String) key; + fileName = fileName.replace("\\", "/"); + Long flength = null; + if (tryDelete(fileName)) { + flength = super.remove(key); + if (flength != null) { + log("cache entry { " + fileName + "} with size {" + flength + + "} removed."); + currentSizeInBytes -= flength.longValue(); + } + } else if (!getFile(fileName).exists()) { + // second attempt. remove from cache if file doesn't exists + flength = super.remove(key); + if (flength != null) { + log(" file not exists. cache entry { " + fileName + + "} with size {" + flength + "} removed."); + currentSizeInBytes -= flength.longValue(); + } + } + return flength; + } + + @Override + public synchronized Long put(final String key, final Long value) { + long flength = value.longValue(); + currentSizeInBytes += flength; + return super.put(key.replace("\\", "/"), value); + } + + /** + * This method tries purging of local cache. It checks if local cache + * has exceeded the defined limit then it triggers purge cache job in a + * seperate thread. + */ + synchronized void tryPurge() { + if (currentSizeInBytes > cachePurgeTrigSize && !isInPurgeMode()) { + setPurgeMode(true); + LOG.info("currentSizeInBytes[" + cache.currentSizeInBytes + + "] exceeds (cachePurgeTrigSize)[" + + cache.cachePurgeTrigSize + "]"); + new Thread(new PurgeJob()).start(); + } + } + /** + * This method check if cache can admit file of given length. + * @param length length of file. + * @return true if cache size + length is less than maxSize. + */ + synchronized boolean canAdmitFile(final long length) { + return cache.currentSizeInBytes + length < cache.maxSizeInBytes; + } + } + + /** + * This class performs purging of local cache. It implements + * {@link Runnable} and should be invoked in a separate thread. + */ + private class PurgeJob implements Runnable { + public PurgeJob() { + // TODO Auto-generated constructor stub + } + + /** + * This method purges local cache till its size is less than + * cacheResizefactor * maxSize + */ + @Override + public void run() { + try { + synchronized (cache) { + LOG.info(" cache purge job started"); + // first try to delete toBeDeleted files + int initialSize = cache.size(); + for (String fileName : new ArrayList(toBeDeleted)) { + cache.remove(fileName); + } + Iterator> itr = cache.entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry entry = itr.next(); + if (entry.getKey() != null) { + if (cache.currentSizeInBytes > cache.cachePurgeResize) { + itr.remove(); + + } else { + break; + } + } + + } + LOG.info(" cache purge job completed: cleaned [" + + (initialSize - cache.size()) + + "] files and currentSizeInBytes = [ " + + cache.currentSizeInBytes + "]"); + } + } catch (Exception e) { + LOG.error("error in purge jobs:", e); + } finally { + setPurgeMode(false); + } + } + } +} Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java?rev=1521876&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (added) +++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java Wed Sep 11 14:35:02 2013 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.aws.ext; + +/** + * Defined Amazon S3 constants. + */ +public final class S3Constants { + + /** + * Amazon aws access key. + */ + public static final String ACCESS_KEY = "accessKey"; + + /** + * Amazon aws secret key. + */ + public static final String SECRET_KEY = "secretKey"; + + /** + * Amazon aws S3 bucket. + */ + public static final String S3_BUCKET = "s3Bucket"; + + /** + * Amazon aws S3 region. + */ + public static final String S3_REGION = "s3Region"; + + /** + * private constructor so that class cannot initialized from outside. + */ + private S3Constants() { + + } + +} Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java?rev=1521876&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (added) +++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java Wed Sep 11 14:35:02 2013 @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.aws.ext; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsResult; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; + +/** + * Amazon S3 utilities. + */ +public final class Utils { + + public static final String DEFAULT_CONFIG_FILE = "aws.properties"; + + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + + private static final String DELETE_CONFIG_SUFFIX = ";burn"; + + /** + * private constructor so that class cannot initialized from outside. + */ + private Utils() { + + } + + /** + * Create AmazonS3Client from properties. + * + * @param prop properties to configure @link {@link AmazonS3Client} + * @return {@link AmazonS3Client} + */ + public static AmazonS3Client openService(final Properties prop) { + AWSCredentials credentials = new BasicAWSCredentials( + prop.getProperty(S3Constants.ACCESS_KEY), + prop.getProperty(S3Constants.SECRET_KEY)); + int connectionTimeOut = Integer.parseInt(prop.getProperty("connectionTimeout")); + int socketTimeOut = Integer.parseInt(prop.getProperty("socketTimeout")); + int maxConnections = Integer.parseInt(prop.getProperty("maxConnections")); + int maxErrorRetry = Integer.parseInt(prop.getProperty("maxErrorRetry")); + ClientConfiguration cc = new ClientConfiguration(); + cc.setConnectionTimeout(connectionTimeOut); + cc.setSocketTimeout(socketTimeOut); + cc.setMaxConnections(maxConnections); + cc.setMaxErrorRetry(maxErrorRetry); + return new AmazonS3Client(credentials, cc); + } + + /** + * Delete S3 bucket. This method first deletes all objects from bucket and + * then delete empty bucket. + * + * @param prop properties to configure @link {@link AmazonS3Client} and + * delete bucket. + */ + public static void deleteBucket(final Properties prop) throws IOException { + AmazonS3Client s3service = openService(prop); + String bucketName = prop.getProperty(S3Constants.S3_BUCKET); + if (!s3service.doesBucketExist(bucketName)) { + return; + } + ObjectListing prevObjectListing = s3service.listObjects(bucketName); + while (true) { + List deleteList = new ArrayList(); + for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) { + deleteList.add(new DeleteObjectsRequest.KeyVersion( + s3ObjSumm.getKey())); + } + if (deleteList.size() > 0) { + DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest( + bucketName); + delObjsReq.setKeys(deleteList); + DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq); + if (dobjs.getDeletedObjects().size() != deleteList.size()) { + throw new IOException( + "Incomplete delete object request. only " + + dobjs.getDeletedObjects().size() + " out of " + + deleteList.size() + " are deleted"); + } + LOG.info(deleteList.size() + + " records deleted from datastore"); + } + if (!prevObjectListing.isTruncated()) { + break; + } + prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing); + } + s3service.deleteBucket(bucketName); + s3service.shutdown(); + } + + /** + * Read a configuration properties file. If the file name ends with ";burn", + * the file is deleted after reading. + * + * @param fileName the properties file name + * @return the properties + * @throws IOException if the file doesn't exist + */ + public static Properties readConfig(String fileName) throws IOException { + boolean delete = false; + if (fileName.endsWith(DELETE_CONFIG_SUFFIX)) { + delete = true; + fileName = fileName.substring(0, fileName.length() + - DELETE_CONFIG_SUFFIX.length()); + } + if (!new File(fileName).exists()) { + throw new IOException("Config file not found: " + fileName); + } + Properties prop = new Properties(); + InputStream in = null; + try { + in = new FileInputStream(fileName); + prop.load(in); + } finally { + if (in != null) { + in.close(); + } + if (delete) { + deleteIfPossible(new File(fileName)); + } + } + return prop; + } + + private static void deleteIfPossible(final File file) { + boolean deleted = file.delete(); + if (!deleted) { + LOG.warn("Could not delete " + file.getAbsolutePath()); + } + } + +} Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java?rev=1521876&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java (added) +++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java Wed Sep 11 14:35:02 2013 @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.aws.ext.ds; + +import java.io.File; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; + +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataStoreException; + +/** + * The interface defines the backend which can be plugged into + * {@link CachingDataStore}. + */ +public interface Backend { + + /** + * This method initialize backend with the configuration. + * + * @param store {@link CachingDataStore} + * @param homeDir path of repository home dir. + * @param config path of config property file. + * @throws DataStoreException + */ + void init(CachingDataStore store, String homeDir, String config) + throws DataStoreException; + + /** + * Return inputstream of record identified by identifier. + * + * @param identifier identifier of record. + * @return inputstream of the record. + * @throws DataStoreException if record not found or any error. + */ + InputStream read(DataIdentifier identifier) throws DataStoreException; + + /** + * Return length of record identified by identifier. + * + * @param identifier identifier of record. + * @return length of the record. + * @throws DataStoreException if record not found or any error. + */ + long getLength(DataIdentifier identifier) throws DataStoreException; + + /** + * Return lastModified of record identified by identifier. + * + * @param identifier identifier of record. + * @return lastModified of the record. + * @throws DataStoreException if record not found or any error. + */ + long getLastModified(DataIdentifier identifier) throws DataStoreException; + + /** + * Stores file to backend with identifier used as key. If key pre-exists, it + * updates the timestamp of the key. + * + * @param identifier key of the file + * @param file file that would be stored in backend. + * @throws DataStoreException for any error. + */ + void write(DataIdentifier identifier, File file) throws DataStoreException; + + /** + * Returns identifiers of all records that exists in backend. + * @return iterator consisting of all identifiers + * @throws DataStoreException + */ + Iterator getAllIdentifiers() throws DataStoreException; + + /** + * Update timestamp of record identified by identifier if minModifiedDate is + * greater than record's lastModified else no op. + * + * @throws DataStoreException if record not found. + */ + void touch(DataIdentifier identifier, long minModifiedDate) + throws DataStoreException; + /** + * This method check the existence of record in backend. + * @param identifier identifier to be checked. + * @return true if records exists else false. + * @throws DataStoreException + */ + boolean exists(DataIdentifier identifier) throws DataStoreException; + + /** + * Close backend and release resources like database connection if any. + * @throws DataStoreException + */ + void close() throws DataStoreException; + + /** + * Delete all records which are older than timestamp. + * @param timestamp + * @return list of identifiers which are deleted. + * @throws DataStoreException + */ + List deleteAllOlderThan(long timestamp) throws DataStoreException; + + /** + * Delete record identified by identifier. No-op if identifier not found. + * @param identifier + * @throws DataStoreException + */ + void deleteRecord(DataIdentifier identifier) throws DataStoreException; +} Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java?rev=1521876&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java (added) +++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java Wed Sep 11 14:35:02 2013 @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.aws.ext.ds; + +import java.io.InputStream; + +import org.apache.jackrabbit.core.data.AbstractDataRecord; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataStoreException; + +/** + * CachingDataRecord which stores reference to {@link CachingDataStore}. This + * class doesn't store any references to attributes but attributes are fetched + * on demand from {@link CachingDataStore}. + */ +public class CachingDataRecord extends AbstractDataRecord { + + private final CachingDataStore store; + + public CachingDataRecord(CachingDataStore store, DataIdentifier identifier) { + super(store, identifier); + this.store = store; + } + + @Override + public long getLastModified() { + try { + return store.getLastModified(getIdentifier()); + } catch (DataStoreException dse) { + return 0; + } + } + + @Override + public long getLength() throws DataStoreException { + return store.getLength(getIdentifier()); + } + + @Override + public InputStream getStream() throws DataStoreException { + return store.getStream(getIdentifier()); + } + +} Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java?rev=1521876&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java (added) +++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java Wed Sep 11 14:35:02 2013 @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.aws.ext.ds; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.lang.ref.WeakReference; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.WeakHashMap; + +import javax.jcr.RepositoryException; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.aws.ext.LocalCache; +import org.apache.jackrabbit.core.data.AbstractDataStore; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.core.data.MultiDataStoreAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A caching data store that consists of {@link LocalCache} and {@link Backend}. + * {@link Backend} is single source of truth. All methods first try to fetch + * information from {@link LocalCache}. If record is not available in + * {@link LocalCache}, then it is fetched from {@link Backend} and saved to + * {@link LocalCache} for further access. This class is designed to work without + * {@link LocalCache} and then all information is fetched from {@link Backend}. + * To disable {@link LocalCache} set {@link #setCacheSize(long)} to 0. * + * Configuration: + * + *
+ * <DataStore class="org.apache.jackrabbit.aws.ext.ds.CachingDataStore">
+ * 
+ *     <param name="{@link #setPath(String) path}" value="/data/datastore"/>
+ *     <param name="{@link #setConfig(String) config}" value="${rep.home}/backend.properties"/>
+ *     <param name="{@link #setCacheSize(long) cacheSize}" value="68719476736"/>
+ *     <param name="{@link #setSecret(String) secret}" value="123456"/>
+ *     <param name="{@link #setCachePurgeTrigFactor(double)}" value="0.95d"/>
+ *     <param name="{@link #setCachePurgeResizeFactor(double) cacheSize}" value="0.85d"/>
+ *     <param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/>
+ * </DataStore>
+ */
+public abstract class CachingDataStore extends AbstractDataStore implements
+        MultiDataStoreAware {
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(CachingDataStore.class);
+
+    /**
+     * The digest algorithm used to uniquely identify records.
+     */
+    private static final String DIGEST = "SHA-1";
+
+    private static final String DS_STORE = ".DS_Store";
+    
+    /**
+     * Name of the directory used for temporary files. Must be at least 3
+     * characters.
+     */
+    private static final String TMP = "tmp";
+    
+    /**
+     * All data identifiers that are currently in use are in this set until they
+     * are garbage collected.
+     */
+    protected Map> inUse = 
+            Collections.synchronizedMap(new WeakHashMap>());
+
+    protected Backend backend;
+
+    /**
+     * The minimum size of an object that should be stored in this data store.
+     */
+    private int minRecordLength = 16 * 1024;
+
+    private String path;
+
+    private File directory;
+
+    private File tmpDir;
+
+    private String secret;
+
+    /**
+     * The optional backend configuration.
+     */
+    private String config;
+
+    /**
+     * The minimum modified date. If a file is accessed (read or write) with a
+     * modified date older than this value, the modified date is updated to the
+     * current time.
+     */
+    private long minModifiedDate;
+
+    /**
+     * Cache purge trigger factor. Cache will undergo in auto-purge mode if
+     * cache current size is greater than cachePurgeTrigFactor * cacheSize
+     */
+    private double cachePurgeTrigFactor = 0.95d;
+
+    /**
+     * Cache resize factor. After auto-purge mode, cache current size would just
+     * greater than cachePurgeResizeFactor * cacheSize cacheSize
+     */
+    private double cachePurgeResizeFactor = 0.85d;
+
+    /**
+     * The number of bytes in the cache. The default value is 64 GB.
+     */
+    private long cacheSize = 64L * 1024 * 1024 * 1024;
+
+    /**
+     * The local file system cache.
+     */
+    private LocalCache cache;
+
+    abstract Backend createBackend();
+
+    abstract String getMarkerFile();
+
+    /**
+     * Initialized the data store. If the path is not set, <repository
+     * home>/repository/datastore is used. This directory is automatically
+     * created if it does not yet exist. During first initialization, it upload
+     * all files from local datastore to backed and local datastore act as a
+     * local cache.
+     */
+    @Override
+    public void init(String homeDir) throws RepositoryException {
+        if (path == null) {
+            path = homeDir + "/repository/datastore";
+        }
+        directory = new File(path);
+        try {
+            mkdirs(directory);
+        } catch (IOException e) {
+            throw new DataStoreException("Could not create directory "
+                + directory.getAbsolutePath(), e);
+        }
+        tmpDir = new File(homeDir, "/repository/s3tmp");
+        try {
+            if (!mkdirs(tmpDir)) {
+                FileUtils.cleanDirectory(tmpDir);
+                LOG.info("tmp = " + tmpDir.getPath() + " cleaned");
+            }
+        } catch (IOException e) {
+            throw new DataStoreException("Could not create directory "
+                + tmpDir.getAbsolutePath(), e);
+        }
+        LOG.info("cachePurgeTrigFactor = " + cachePurgeTrigFactor
+            + ", cachePurgeResizeFactor = " + cachePurgeResizeFactor);
+        backend = createBackend();
+        backend.init(this, path, config);
+        String markerFileName = getMarkerFile();
+        if (markerFileName != null) {
+            // create marker file in homeDir to avoid deletion in cache cleanup.
+            File markerFile = new File(homeDir, markerFileName);
+            if (!markerFile.exists()) {
+                LOG.info("load files from local cache");
+                loadFilesFromCache();
+                try {
+                    markerFile.createNewFile();
+                } catch (IOException e) {
+                    throw new DataStoreException(
+                        "Could not create marker file "
+                            + markerFile.getAbsolutePath(), e);
+                }
+            } else {
+                LOG.info("marker file = " + markerFile.getAbsolutePath()
+                    + " exists");
+            }
+        }
+        cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
+            cachePurgeTrigFactor, cachePurgeResizeFactor);
+    }
+
+    /**
+     * Creates a new data record in {@link Backend}. The stream is first
+     * consumed and the contents are saved in a temporary file and the SHA-1
+     * message digest of the stream is calculated. If a record with the same
+     * SHA-1 digest (and length) is found then it is returned. Otherwise new
+     * record is created in {@link Backend} and the temporary file is moved in
+     * place to {@link LocalCache}.
+     * 
+     * @param input binary stream
+     * @return {@link CachingDataRecord}
+     * @throws DataStoreException if the record could not be created.
+     */
+    @Override
+    public DataRecord addRecord(InputStream input) throws DataStoreException {
+        File temporary = null;
+        try {
+            temporary = newTemporaryFile();
+            DataIdentifier tempId = new DataIdentifier(temporary.getName());
+            usesIdentifier(tempId);
+            // Copy the stream to the temporary file and calculate the
+            // stream length and the message digest of the stream
+            MessageDigest digest = MessageDigest.getInstance(DIGEST);
+            OutputStream output = new DigestOutputStream(new FileOutputStream(
+                temporary), digest);
+            try {
+                IOUtils.copyLarge(input, output);
+            } finally {
+                output.close();
+            }
+            DataIdentifier identifier = new DataIdentifier(
+                encodeHexString(digest.digest()));
+            synchronized (this) {
+                usesIdentifier(identifier);
+                backend.write(identifier, temporary);
+                String fileName = getFileName(identifier);
+                cache.store(fileName, temporary);
+            }
+            // this will also make sure that
+            // tempId is not garbage collected until here
+            inUse.remove(tempId);
+            return new CachingDataRecord(this, identifier);
+        } catch (NoSuchAlgorithmException e) {
+            throw new DataStoreException(DIGEST + " not available", e);
+        } catch (IOException e) {
+            throw new DataStoreException("Could not add record", e);
+        } finally {
+            if (temporary != null) {
+                // try to delete - but it's not a big deal if we can't
+                temporary.delete();
+            }
+        }
+    }
+
+    /**
+     * Get a data record for the given identifier or null it data record doesn't
+     * exist in {@link Backend}
+     * 
+     * @param identifier identifier of record.
+     * @return the {@link CachingDataRecord} or null.
+     */
+    @Override
+    public DataRecord getRecordIfStored(DataIdentifier identifier)
+            throws DataStoreException {
+        synchronized (this) {
+            usesIdentifier(identifier);
+            if (!backend.exists(identifier)) {
+                return null;
+            }
+            backend.touch(identifier, minModifiedDate);
+            return new CachingDataRecord(this, identifier);
+        }
+    }
+
+    @Override
+    public void updateModifiedDateOnAccess(long before) {
+        LOG.info("minModifiedDate set to: " + before);
+        minModifiedDate = before;
+    }
+    /**
+     * Retrieves all identifiers from {@link Backend}.
+     */
+    @Override
+    public Iterator getAllIdentifiers()
+            throws DataStoreException {
+        return backend.getAllIdentifiers();
+    }
+
+    /**
+     * This method deletes record from {@link Backend} and then from
+     * {@link LocalCache}
+     */
+    @Override
+    public void deleteRecord(DataIdentifier identifier)
+            throws DataStoreException {
+        String fileName = getFileName(identifier);
+        synchronized (this) {
+            backend.deleteRecord(identifier);
+            cache.delete(fileName);
+        }
+    }
+
+    @Override
+    public synchronized int deleteAllOlderThan(long min)
+            throws DataStoreException {
+        List diList = backend.deleteAllOlderThan(min);
+        // remove entries from local cache
+        for (DataIdentifier identifier : diList) {
+            cache.delete(getFileName(identifier));
+        }
+        return diList.size();
+    }
+
+    /**
+     * Get stream of record from {@link LocalCache}. If record is not available
+     * in {@link LocalCache}, this method fetches record from {@link Backend}
+     * and stores it to {@link LocalCache}. Stream is then returned from cached
+     * record.
+     */
+    InputStream getStream(DataIdentifier identifier) throws DataStoreException {
+        InputStream in = null;
+        try {
+            String fileName = getFileName(identifier);
+            InputStream cached = cache.getIfStored(fileName);
+            if (cached != null) {
+                return cached;
+            }
+            in = backend.read(identifier);
+            return cache.store(fileName, in);
+        } catch (IOException e) {
+            throw new DataStoreException("IO Exception: " + identifier, e);
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+    }
+
+    /**
+     * Return lastModified of record from {@link Backend} assuming
+     * {@link Backend} as a single source of truth.
+     */
+    long getLastModified(DataIdentifier identifier) throws DataStoreException {
+        LOG.info("accessed lastModified");
+        return backend.getLastModified(identifier);
+    }
+
+    /**
+     * Return the length of record from {@link LocalCache} if available,
+     * otherwise retrieve it from {@link Backend}.
+     */
+    long getLength(DataIdentifier identifier) throws DataStoreException {
+        String fileName = getFileName(identifier);
+        Long length = cache.getFileLength(fileName);
+        if (length != null) {
+            return length.longValue();
+        }
+        return backend.getLength(identifier);
+    }
+
+    @Override
+    protected byte[] getOrCreateReferenceKey() throws DataStoreException {
+        try {
+            return secret.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new DataStoreException(e);
+        }
+    }
+
+    /**
+     * Returns a unique temporary file to be used for creating a new data
+     * record.
+     */
+    private File newTemporaryFile() throws IOException {
+        return File.createTempFile(TMP, null, tmpDir);
+    }
+
+    /**
+     * Load files from {@link LocalCache} to {@link Backend}.
+     */
+    private void loadFilesFromCache() throws RepositoryException {
+        ArrayList files = new ArrayList();
+        listRecursive(files, directory);
+        long totalSize = 0;
+        for (File f : files) {
+            totalSize += f.length();
+        }
+        long currentSize = 0;
+        long time = System.currentTimeMillis();
+        for (File f : files) {
+            long now = System.currentTimeMillis();
+            if (now > time + 5000) {
+                LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+                time = now;
+            }
+            currentSize += f.length();
+            String name = f.getName();
+            LOG.debug("upload file = " + name);
+            if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
+                && f.length() > 0) {
+                loadFileToBackEnd(f);
+            }
+        }
+        LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+    }
+
+    /**
+     * Traverse recursively and populate list with files.
+     */
+    private void listRecursive(List list, File file) {
+        File[] files = file.listFiles();
+        if (files != null) {
+            for (File f : files) {
+                if (f.isDirectory()) {
+                    listRecursive(list, f);
+                } else {
+                    list.add(f);
+                }
+            }
+        }
+    }
+    /**
+     * Upload file from {@link LocalCache} to {@link Backend}. 
+     * @param f file to uploaded.
+     * @throws DataStoreException
+     */
+    private void loadFileToBackEnd(File f) throws DataStoreException {
+        DataIdentifier identifier = new DataIdentifier(f.getName());
+        usesIdentifier(identifier);
+        backend.write(identifier, f);
+        LOG.debug(f.getName() + "uploaded.");
+
+    }
+
+    /**
+     * Derive file name from identifier.
+     */
+    private static String getFileName(DataIdentifier identifier) {
+        String name = identifier.toString();
+        name = name.substring(0, 2) + "/" + name.substring(2, 4) + "/"
+            + name.substring(4, 6) + "/" + name;
+        return name;
+    }
+
+    private void usesIdentifier(DataIdentifier identifier) {
+        inUse.put(identifier, new WeakReference(identifier));
+    }
+
+    private static boolean mkdirs(File dir) throws IOException {
+        if (dir.exists()) {
+            if (dir.isFile()) {
+                throw new IOException("Can not create a directory "
+                    + "because a file exists with the same name: "
+                    + dir.getAbsolutePath());
+            }
+            return false;
+        }
+        boolean created = dir.mkdirs();
+        if (!created) {
+            throw new IOException("Could not create directory: "
+                + dir.getAbsolutePath());
+        }
+        return created;
+    }
+
+    @Override
+    public void clearInUse() {
+        inUse.clear();
+    }
+
+    @Override
+    public void close() throws DataStoreException {
+        cache.close();
+        backend.close();
+        cache = null;
+    }
+
+    /**
+     * Setter for configuration based secret
+     * 
+     * @param secret the secret used to sign reference binaries
+     */
+    public void setSecret(String secret) {
+        this.secret = secret;
+    }
+
+    /**
+     * Set the minimum object length.
+     * 
+     * @param minRecordLength the length
+     */
+    public void setMinRecordLength(int minRecordLength) {
+        this.minRecordLength = minRecordLength;
+    }
+
+    /**
+     * Return mininum object length.
+     */
+    @Override
+    public int getMinRecordLength() {
+        return minRecordLength;
+    }
+
+    /**
+     * Return path of configuration properties.
+     * 
+     * @return path of configuration properties.
+     */
+    public String getConfig() {
+        return config;
+    }
+
+    /**
+     * Set the configuration properties path.
+     * 
+     * @param config path of configuration properties.
+     */
+    public void setConfig(String config) {
+        this.config = config;
+    }
+    /**
+     * @return  size of {@link LocalCache}. 
+     */
+    public long getCacheSize() {
+        return cacheSize;
+    }
+    /**
+     * Set size of {@link LocalCache}.
+     * @param cacheSize size of {@link LocalCache}.  
+     */
+    public void setCacheSize(long cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+    /**
+     * 
+     * @return path of {@link LocalCache}.
+     */
+    public String getPath() {
+        return path;
+    }
+    /**
+     * Set path of {@link LocalCache}.
+     * @param path of {@link LocalCache}.
+     */
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    /**
+     * @return Purge trigger factor of {@link LocalCache}.
+     */
+    public double getCachePurgeTrigFactor() {
+        return cachePurgeTrigFactor;
+    }
+
+    /**
+     * Set purge trigger factor of {@link LocalCache}.
+     * @param cachePurgeTrigFactor purge trigger factor.
+     */
+    public void setCachePurgeTrigFactor(double cachePurgeTrigFactor) {
+        this.cachePurgeTrigFactor = cachePurgeTrigFactor;
+    }
+
+    /**
+     * @return Purge resize factor of {@link LocalCache}.
+     */
+    public double getCachePurgeResizeFactor() {
+        return cachePurgeResizeFactor;
+    }
+
+    /**
+     * Set purge resize factor of {@link LocalCache}.
+     * @param cachePurgeResizeFactor purge resize factor.
+     */
+    public void setCachePurgeResizeFactor(double cachePurgeResizeFactor) {
+        this.cachePurgeResizeFactor = cachePurgeResizeFactor;
+    }
+
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.jackrabbit.aws.ext.S3Constants;
+import org.apache.jackrabbit.aws.ext.Utils;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+
+/**
+ * A data store backend that stores data on Amazon S3.
+ */
+public class S3Backend implements Backend {
+
+    private static final String KEY_PREFIX = "dataStore_";
+
+    /**
+     * The default value AWS bucket region.
+     */
+    private static final String DEFAULT_AWS_BUCKET_REGION = "us-standard";
+
+    /**
+     * constants to define endpoint to various AWS region
+     */
+    private static final String AWSDOTCOM = "amazonaws.com";
+
+    private static final String S3 = "s3";
+
+    private static final String DOT = ".";
+
+    private static final String DASH = "-";
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(S3Backend.class);
+
+    private AmazonS3Client s3service;
+
+    private String bucket;
+
+    private TransferManager tmx;
+
+    private CachingDataStore store;
+
+    /**
+     * Initialize S3Backend. It creates AmazonS3Client and TransferManager from
+     * aws.properties. It creates S3 bucket if it doesn't pre-exist in S3.
+     */
+    @Override
+    public void init(CachingDataStore store, String homeDir, String config)
+            throws DataStoreException {
+        if (config == null) {
+            config = Utils.DEFAULT_CONFIG_FILE;
+        }
+        try {
+            Properties prop = Utils.readConfig(config);
+            LOG.debug("init");
+            this.store = store;
+            s3service = Utils.openService(prop);
+            bucket = prop.getProperty(S3Constants.S3_BUCKET);
+            String region = prop.getProperty(S3Constants.S3_REGION);
+            String endpoint = null;
+            if (!s3service.doesBucketExist(bucket)) {
+
+                if (DEFAULT_AWS_BUCKET_REGION.equals(region)) {
+                    s3service.createBucket(bucket, Region.US_Standard);
+                    endpoint = S3 + DOT + AWSDOTCOM;
+                } else if (Region.EU_Ireland.toString().equals(region)) {
+                    s3service.createBucket(bucket, Region.EU_Ireland);
+                    endpoint = "s3-eu-west-1" + DOT + AWSDOTCOM;
+                } else {
+                    s3service.createBucket(bucket, region);
+                    endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
+                }
+                LOG.info("Created bucket: " + bucket + " in " + region);
+            } else {
+                LOG.info("Using bucket: " + bucket);
+                if (DEFAULT_AWS_BUCKET_REGION.equals(region)) {
+                    endpoint = S3 + DOT + AWSDOTCOM;
+                } else if (Region.EU_Ireland.toString().equals(region)) {
+                    endpoint = "s3-eu-west-1" + DOT + AWSDOTCOM;
+                } else {
+                    endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
+                }
+            }
+            /*
+             * setting endpoint to remove latency of redirection. If endpoint is
+             * not set, invocation first goes us standard region, which
+             * redirects it to correct location.
+             */
+            s3service.setEndpoint(endpoint);
+            LOG.info("S3 service endpoint: " + endpoint);
+            tmx = new TransferManager(s3service, createDefaultExecutorService());
+            LOG.debug("  done");
+        } catch (Exception e) {
+            LOG.debug("  error ", e);
+            throw new DataStoreException("Could not initialize S3 from "
+                + config, e);
+        }
+    }
+
+    /**
+     * It uploads file to Amazon S3. If file size is greater than 5MB, this
+     * method uses parallel concurrent connections to upload.
+     */
+    @Override
+    public void write(DataIdentifier identifier, File file)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        ObjectMetadata objectMetaData = null;
+        long start = System.currentTimeMillis();
+        LOG.debug("write {0} length {1}", identifier, file.length());
+        try {
+            // check if the same record already exists
+            try {
+                objectMetaData = s3service.getObjectMetadata(bucket, key);
+            } catch (AmazonServiceException ase) {
+                if (ase.getStatusCode() != 404) {
+                    throw ase;
+                }
+            }
+            if (objectMetaData != null) {
+                long l = objectMetaData.getContentLength();
+                if (l != file.length()) {
+                    throw new DataStoreException("Collision: " + key
+                        + " new length: " + file.length() + " old length: " + l);
+                }
+                LOG.debug(key + "   exists");
+                CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
+                    bucket, key);
+                copReq.setNewObjectMetadata(objectMetaData);
+                s3service.copyObject(copReq);
+                LOG.debug("lastModified of " + identifier.toString()
+                    + " updated successfully");
+                LOG.debug("   updated");
+            }
+        } catch (AmazonServiceException e) {
+            LOG.debug("   does not exist", e);
+            // not found - create it
+        }
+        if (objectMetaData == null) {
+            LOG.debug("   creating");
+            try {
+                // start multipart parallel upload using amazon sdk
+                Upload up = tmx.upload(new PutObjectRequest(bucket, key, file));
+                // wait for upload to finish
+                up.waitForUploadResult();
+                LOG.debug("   done");
+            } catch (Exception e2) {
+                LOG.debug("   could not upload", e2);
+                throw new DataStoreException("Could not upload " + key, e2);
+            }
+        }
+        LOG.debug("    ms: {0}", System.currentTimeMillis() - start);
+
+    }
+
+    /**
+     * Check if record identified by identifier exists in Amazon S3.
+     */
+    @Override
+    public boolean exists(DataIdentifier identifier) throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            LOG.debug("exists {0}", identifier);
+            ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket,
+                key);
+            if (objectMetaData != null) {
+                LOG.debug("  true");
+                return true;
+            }
+            return false;
+        } catch (AmazonServiceException e) {
+            if (e.getStatusCode() == 404) {
+                LOG.info("key [" + identifier.toString() + "] not found.");
+                return false;
+            }
+            throw new DataStoreException(
+                "Error occured to getObjectMetadata for key ["
+                    + identifier.toString() + "]", e);
+        }
+    }
+
+    @Override
+    public void touch(DataIdentifier identifier, long minModifiedDate)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            if (minModifiedDate != 0) {
+                ObjectMetadata objectMetaData = s3service.getObjectMetadata(
+                    bucket, key);
+                if (objectMetaData.getLastModified().getTime() < minModifiedDate) {
+                    CopyObjectRequest copReq = new CopyObjectRequest(bucket,
+                        key, bucket, key);
+                    copReq.setNewObjectMetadata(objectMetaData);
+                    s3service.copyObject(copReq);
+                    LOG.debug("lastModified of " + identifier.toString()
+                        + " updated successfully");
+                }
+            }
+        } catch (Exception e) {
+            throw new DataStoreException(
+                "An Exception occurred while trying to set the last modified date of record "
+                    + identifier.toString(), e);
+        }
+    }
+
+    @Override
+    public InputStream read(DataIdentifier identifier)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            LOG.debug("read {" + identifier + "}");
+            S3Object object = s3service.getObject(bucket, key);
+            InputStream in = object.getObjectContent();
+            LOG.debug("  return");
+            return in;
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException("Object not found: " + key, e);
+        }
+    }
+
+    @Override
+    public Iterator getAllIdentifiers()
+            throws DataStoreException {
+        try {
+            LOG.debug("getAllIdentifiers");
+            Set ids = new HashSet();
+            ObjectListing prevObjectListing = s3service.listObjects(bucket,
+                KEY_PREFIX);
+            while (true) {
+                for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                    String id = getIdentifierName(s3ObjSumm.getKey());
+                    if (id != null) {
+                        ids.add(new DataIdentifier(id));
+                    }
+                }
+                if (!prevObjectListing.isTruncated()) {
+                    break;
+                }
+                prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+            }
+            LOG.debug("  return");
+            return ids.iterator();
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException("Could not list objects", e);
+        }
+    }
+
+    @Override
+    public long getLastModified(DataIdentifier identifier)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
+            return object.getLastModified().getTime();
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException(
+                "Could not getLastModified of dataIdentifier " + identifier, e);
+        }
+    }
+
+    @Override
+    public long getLength(DataIdentifier identifier) throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
+            return object.getContentLength();
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException("Could not length of dataIdentifier "
+                + identifier, e);
+        }
+    }
+
+    @Override
+    public void deleteRecord(DataIdentifier identifier)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            s3service.deleteObject(bucket, key);
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException(
+                "Could not getLastModified of dataIdentifier " + identifier, e);
+        }
+    }
+
+    @Override
+    public List deleteAllOlderThan(long min)
+            throws DataStoreException {
+        LOG.info("deleteAllOlderThan " + new Date(min));
+        List diDeleteList = new ArrayList(30);
+        ObjectListing prevObjectListing = s3service.listObjects(bucket);
+        while (true) {
+            List deleteList = new ArrayList();
+            for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                DataIdentifier identifier = new DataIdentifier(
+                    getIdentifierName(s3ObjSumm.getKey()));
+                if (!store.inUse.containsKey(identifier)
+                    && s3ObjSumm.getLastModified().getTime() < min) {
+                    LOG.info("add id :" + s3ObjSumm.getKey()
+                        + " to delete lists");
+                    deleteList.add(new DeleteObjectsRequest.KeyVersion(
+                        s3ObjSumm.getKey()));
+                    diDeleteList.add(new DataIdentifier(
+                        getIdentifierName(s3ObjSumm.getKey())));
+                }
+            }
+            if (deleteList.size() > 0) {
+                DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
+                    bucket);
+                delObjsReq.setKeys(deleteList);
+                DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+                if (dobjs.getDeletedObjects().size() != deleteList.size()) {
+                    throw new DataStoreException(
+                        "Incomplete delete object request. only  "
+                            + dobjs.getDeletedObjects().size() + " out of "
+                            + deleteList.size() + " are deleted");
+                }
+                LOG.info(deleteList.size() + " records deleted from datastore");
+            }
+            if (!prevObjectListing.isTruncated()) {
+                break;
+            }
+            prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+        }
+        LOG.info("deleteAllOlderThan  exit");
+        return diDeleteList;
+    }
+
+    @Override
+    public void close() {
+        s3service.shutdown();
+        s3service = null;
+        tmx = null;
+    }
+
+    /**
+     * Get key from data identifier. Object is stored with key in S3.
+     */
+    private static String getKeyName(DataIdentifier identifier) {
+        return KEY_PREFIX + identifier.toString();
+    }
+
+    /**
+     * Get data identifier from key.
+     */
+    private static String getIdentifierName(String key) {
+        if (!key.startsWith(KEY_PREFIX)) {
+            return null;
+        }
+        return key.substring(KEY_PREFIX.length());
+    }
+
+    /**
+     * Returns a new thread pool configured with the default settings.
+     * 
+     * @return A new thread pool configured with the default settings.
+     */
+    private ThreadPoolExecutor createDefaultExecutorService() {
+        ThreadFactory threadFactory = new ThreadFactory() {
+            private int threadCount = 1;
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r);
+                thread.setContextClassLoader(getClass().getClassLoader());
+                thread.setName("s3-transfer-manager-worker-" + threadCount++);
+                return thread;
+            }
+        };
+        return (ThreadPoolExecutor) Executors.newFixedThreadPool(10,
+            threadFactory);
+    }
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+/**
+ * An Amazon S3 data store.
+ */
+public class S3DataStore extends CachingDataStore {
+
+    @Override
+    protected Backend createBackend() {
+        return new S3Backend();
+    }
+
+    @Override
+    protected String getMarkerFile() {
+        return "s3.init.done";
+    }
+
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.jackrabbit.aws.ext.ds.TestCaseBase;
+import org.apache.jackrabbit.aws.ext.ds.TestInMemDs;
+import org.apache.jackrabbit.aws.ext.ds.TestInMemDsCacheOff;
+import org.apache.jackrabbit.aws.ext.ds.TestS3Ds;
+import org.apache.jackrabbit.aws.ext.ds.TestS3DsCacheOff;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test suite that includes all test cases for the this module.
+ */
+public class TestAll extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestAll.class);
+
+    /**
+     * TestAll suite that executes all tests inside this module. To
+     * run test cases agains Amazon S3 pass AWS configuration properties file as
+     * system property -Dconfig=/opt/cq/aws.properties. Sample aws properties
+     * located at src/test/resources/aws.properties.
+     */
+    public static Test suite() {
+        TestSuite suite = new TestSuite("S3 tests");
+        suite.addTestSuite(TestLocalCache.class);
+        suite.addTestSuite(TestInMemDs.class);
+        suite.addTestSuite(TestInMemDsCacheOff.class);
+        String config = System.getProperty(TestCaseBase.CONFIG);
+        LOG.info("config= " + config);
+        if (config != null && !"".equals(config.trim())) {
+            suite.addTestSuite(TestS3Ds.class);
+            suite.addTestSuite(TestS3DsCacheOff.class);
+        }
+        return suite;
+    }
+}