jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From thom...@apache.org
Subject svn commit: r1521876 [1/2] - in /jackrabbit/trunk/jackrabbit-aws-ext: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/jackrabbit/ src/main/java/org/apache/jackrabbit/aws/ src/main/java/org/apache/j...
Date Wed, 11 Sep 2013 14:35:03 GMT
Author: thomasm
Date: Wed Sep 11 14:35:02 2013
New Revision: 1521876

URL: http://svn.apache.org/r1521876
Log:
JCR-3651 S3 Datastore implementation (initial commit)

Added:
    jackrabbit/trunk/jackrabbit-aws-ext/   (with props)
    jackrabbit/trunk/jackrabbit-aws-ext/README.txt
    jackrabbit/trunk/jackrabbit-aws-ext/pom.xml
    jackrabbit/trunk/jackrabbit-aws-ext/src/
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestLocalCache.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryBackend.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryDataStore.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestCaseBase.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDs.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/aws.properties
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/log4j.properties
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml

Propchange: jackrabbit/trunk/jackrabbit-aws-ext/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Sep 11 14:35:02 2013
@@ -0,0 +1,10 @@
+target
+*.iws
+*.ipr
+*.iml
+junit*.properties
+.*
+*.xml.md5
+*-pom-snapshot-version
+.checkstyle
+

Added: jackrabbit/trunk/jackrabbit-aws-ext/README.txt
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/README.txt?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/README.txt (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/README.txt Wed Sep 11 14:35:02 2013
@@ -0,0 +1,28 @@
+====================================================
+Welcome to Jackrabbit Amazon WebServices Extension
+====================================================
+
+This is the Amazon Webservices Extension component of the Apache Jackrabbit project.
+This component contains S3 Datastore which stores binaries on Amazon S3 (http://aws.amazon.com/s3).
+
+====================================================
+Build Instructions
+====================================================
+To build the latest SNAPSHOT versions of all the components
+included here, run the following command with Maven 3:
+
+    mvn clean install
+
+To run testcases which stores in S3 bucket, please pass aws config file via system property. For e.g.
+
+    mvn clean install  -DargLine="-Dconfig=/opt/cq/aws.properties"
+
+Sample aws properties located at src/test/resources/aws.properties
+
+====================================================
+Configuration Instructions
+====================================================
+It require to configure aws.properties to configure S3 Datastore.
+    <DataStore class="org.apache.jackrabbit.aws.ext.ds.S3DataStore">
+        <param name="config" value="${rep.home}/aws.properties"/>
+    </DataStore>

Added: jackrabbit/trunk/jackrabbit-aws-ext/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/pom.xml?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/pom.xml (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/pom.xml Wed Sep 11 14:35:02 2013
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd ">
+    <modelVersion>4.0.0</modelVersion>
+
+    <!-- ====================================================================== -->
+    <!-- P R O J E C T D E S C R I P T I O N -->
+    <!-- ====================================================================== -->
+    <parent>
+        <groupId>org.apache.jackrabbit</groupId>
+        <artifactId>jackrabbit-parent</artifactId>
+        <version>2.8-SNAPSHOT</version>
+        <relativePath>../jackrabbit-parent/pom.xml</relativePath>
+    </parent>
+    <artifactId>jackrabbit-aws-ext</artifactId>
+    <name>Jackrabbit AWS Extension</name>
+    <description>Jackrabbit extenstion to Amazon Webservices</description>
+
+    <!-- ====================================================================== -->
+    <!-- D E P E N D E N C I E S -->
+    <!-- ====================================================================== -->
+    <dependencies>
+        <dependency>
+            <groupId>javax.jcr</groupId>
+            <artifactId>jcr</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-jcr-commons</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>1.5.6</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <includes>
+                        <include>**/aws/**/TestAll.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.core.data.LazyFileInputStream;
+import org.apache.jackrabbit.util.TransientFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a LRU cache used by {@link CachingDataStore}. If cache
+ * size exceeds limit, this cache goes in purge mode. In purge mode any
+ * operation to cache is no-op. After purge cache size would be less than
+ * cachePurgeResizeFactor * maximum size.
+ */
+public class LocalCache {
+
+    /**
+     * Logger instance.
+     */
+    static final Logger LOG = LoggerFactory.getLogger(LocalCache.class);
+
+    /**
+     * The file names of the files that need to be deleted.
+     */
+    final Set<String> toBeDeleted = new HashSet<String>();
+
+    /**
+     * The filename Vs file size LRU cache.
+     */
+    LRUCache cache;
+
+    /**
+     * The directory where the files are created.
+     */
+    private final File directory;
+
+    /**
+     * The directory where tmp files are created.
+     */
+    private final File tmp;
+
+    /**
+     * The maximum size of cache in bytes.
+     */
+    private long maxSize;
+
+    /**
+     * If true cache is in purgeMode and not available. All operation would be
+     * no-op.
+     */
+    private volatile boolean purgeMode;
+
+    /**
+     * Build LRU cache of files located at 'path'. It uses lastModified property
+     * of file to build LRU cache. If cache size exceeds limit size, this cache
+     * goes in purge mode. In purge mode any operation to cache is no-op.
+     * 
+     * @param path file system path
+     * @param tmpPath temporary directory used by cache.
+     * @param maxSize maximum size of cache.
+     * @param cachePurgeTrigFactor factor which triggers cache to purge mode.
+     * That is if current size exceed (cachePurgeTrigFactor * maxSize), the
+     * cache will go in auto-purge mode.
+     * @param cachePurgeResizeFactor after cache purge size of cache will be
+     * just less (cachePurgeResizeFactor * maxSize).
+     * @throws RepositoryException
+     */
+    public LocalCache(final String path, final String tmpPath,
+            final long maxSize, final double cachePurgeTrigFactor,
+            final double cachePurgeResizeFactor) throws RepositoryException {
+        this.maxSize = maxSize;
+        directory = new File(path);
+        tmp = new File(tmpPath);
+        cache = new LRUCache(maxSize, cachePurgeTrigFactor,
+            cachePurgeResizeFactor);
+        ArrayList<File> allFiles = new ArrayList<File>();
+
+        Iterator<File> it = FileUtils.iterateFiles(directory, null, true);
+        while (it.hasNext()) {
+            File f = it.next();
+            allFiles.add(f);
+        }
+        Collections.sort(allFiles, new Comparator<File>() {
+            @Override
+            public int compare(final File o1, final File o2) {
+                long l1 = o1.lastModified(), l2 = o2.lastModified();
+                return l1 < l2 ? -1 : l1 > l2 ? 1 : 0;
+            }
+        });
+        String dataStorePath = directory.getAbsolutePath();
+        long time = System.currentTimeMillis();
+        int count = 0;
+        int deletecount = 0;
+        for (File f : allFiles) {
+            if (f.exists()) {
+                long length = f.length();
+                String name = f.getPath();
+                if (name.startsWith(dataStorePath)) {
+                    name = name.substring(dataStorePath.length());
+                }
+                // convert to java path format
+                name = name.replace("\\", "/");
+                if (name.startsWith("/") || name.startsWith("\\")) {
+                    name = name.substring(1);
+                }
+                if ((cache.currentSizeInBytes + length) < cache.maxSizeInBytes) {
+                    count++;
+                    cache.put(name, length);
+                } else {
+                    if (tryDelete(name)) {
+                        deletecount++;
+                    }
+                }
+                long now = System.currentTimeMillis();
+                if (now > time + 5000) {
+                    LOG.info("Processed {" + (count + deletecount) + "}/{"
+                        + allFiles.size() + "}");
+                    time = now;
+                }
+            }
+        }
+        LOG.info("Cached {" + count + "}/{" + allFiles.size()
+            + "} , currentSizeInBytes = " + cache.currentSizeInBytes);
+        LOG.info("Deleted {" + deletecount + "}/{" + allFiles.size()
+            + "} files .");
+    }
+
+    /**
+     * Store an item in the cache and return the input stream. If cache is in
+     * purgeMode or file doesn't exists, inputstream from a
+     * {@link TransientFileFactory#createTransientFile(String, String, File)} is
+     * returned. Otherwise inputStream from cached file is returned. This method
+     * doesn't close the incoming inputstream.
+     * 
+     * @param fileName the key of cache.
+     * @param in the inputstream.
+     * @return the (new) input stream.
+     */
+    public synchronized InputStream store(String fileName, final InputStream in)
+            throws IOException {
+        fileName = fileName.replace("\\", "/");
+        File f = getFile(fileName);
+        long length = 0;
+        if (!f.exists() || isInPurgeMode()) {
+            OutputStream out = null;
+            File transFile = null;
+            try {
+                TransientFileFactory tff = TransientFileFactory.getInstance();
+                transFile = tff.createTransientFile("s3-", "tmp", tmp);
+                out = new BufferedOutputStream(new FileOutputStream(transFile));
+                length = IOUtils.copyLarge(in, out);
+            } finally {
+                IOUtils.closeQuietly(out);
+            }
+            // rename the file to local fs cache
+            if (canAdmitFile(length)
+                && (f.getParentFile().exists() || f.getParentFile().mkdirs())
+                && transFile.renameTo(f) && f.exists()) {
+                if (transFile.exists() && transFile.delete()) {
+                    LOG.warn("tmp file = " + transFile.getAbsolutePath()
+                        + " not deleted successfully");
+                }
+                transFile = null;
+                toBeDeleted.remove(fileName);
+                if (cache.get(fileName) == null) {
+                    cache.put(fileName, f.length());
+                }
+            } else {
+                f = transFile;
+            }
+        } else {
+            // f.exists and not in purge mode
+            f.setLastModified(System.currentTimeMillis());
+            toBeDeleted.remove(fileName);
+            if (cache.get(fileName) == null) {
+                cache.put(fileName, f.length());
+            }
+        }
+        cache.tryPurge();
+        return new LazyFileInputStream(f);
+    }
+
+    /**
+     * Store an item along with file in cache. Cache size is increased by
+     * {@link File#length()} If file already exists in cache,
+     * {@link File#setLastModified(long)} is updated with current time.
+     * 
+     * @param fileName the key of cache.
+     * @param src file to be added to cache.
+     * @throws IOException
+     */
+    public synchronized void store(String fileName, final File src)
+            throws IOException {
+        fileName = fileName.replace("\\", "/");
+        File dest = getFile(fileName);
+        File parent = dest.getParentFile();
+        if (src.exists() && !dest.exists() && !src.equals(dest)
+            && canAdmitFile(src.length())
+            && (parent.exists() || parent.mkdirs()) && (src.renameTo(dest))) {
+            toBeDeleted.remove(fileName);
+            if (cache.get(fileName) == null) {
+                cache.put(fileName, dest.length());
+            }
+
+        } else if (dest.exists()) {
+            dest.setLastModified(System.currentTimeMillis());
+            toBeDeleted.remove(fileName);
+            if (cache.get(fileName) == null) {
+                cache.put(fileName, dest.length());
+            }
+        }
+        cache.tryPurge();
+    }
+
+    /**
+     * Return the inputstream from from cache, or null if not in the cache.
+     * 
+     * @param fileName name of file.
+     * @return  stream or null.
+     */
+    public InputStream getIfStored(String fileName) throws IOException {
+
+        fileName = fileName.replace("\\", "/");
+        File f = getFile(fileName);
+        synchronized (this) {
+            if (!f.exists() || isInPurgeMode()) {
+                log("purgeMode true or file doesn't exists: getIfStored returned");
+                return null;
+            }
+            f.setLastModified(System.currentTimeMillis());
+            return new LazyFileInputStream(f);
+        }
+    }
+
+    /**
+     * Delete file from cache. Size of cache is reduced by file length. The
+     * method is no-op if file doesn't exist in cache.
+     * 
+     * @param fileName file name that need to be removed from cache.
+     */
+    public synchronized void delete(String fileName) {
+        if (isInPurgeMode()) {
+            log("purgeMode true :delete returned");
+            return;
+        }
+        fileName = fileName.replace("\\", "/");
+        cache.remove(fileName);
+    }
+
+    /**
+     * Returns length of file if exists in cache else returns null.
+     * @param fileName name of the file.
+     */
+    public Long getFileLength(String fileName) {
+        fileName = fileName.replace("\\", "/");
+        File f = getFile(fileName);
+        synchronized (this) {
+            if (!f.exists() || isInPurgeMode()) {
+                log("purgeMode true or file doesn't exists: getFileLength returned");
+                return null;
+            }
+            f.setLastModified(System.currentTimeMillis());
+            return f.length();
+        }
+    }
+
+    /**
+     * Close the cache. Cache maintain set of files which it was not able to
+     * delete successfully. This method will an attempt to delete all
+     * unsuccessful delete files.
+     */
+    public void close() {
+        log("close");
+        deleteOldFiles();
+    }
+
+    /**
+     * Check if cache can admit file of given length.
+     * @param length of the file.
+     * @return true if yes else return false.
+     */
+    private synchronized boolean canAdmitFile(final long length) {
+        // order is important here
+        boolean value = !isInPurgeMode() && cache.canAdmitFile(length);
+        if (!value) {
+            log("cannot admit file of length=" + length
+                + " and currentSizeInBytes=" + cache.currentSizeInBytes);
+        }
+        return value;
+    }
+
+    /**
+     * Return true if cache is in purge mode else return false.
+     */
+    synchronized boolean isInPurgeMode() {
+        return purgeMode || maxSize == 0;
+    }
+
+    /**
+     * Set purge mode. If set to true all cache operation will be no-op. If set
+     * to false, all operations to cache are available.
+     * 
+     * @param purgeMode purge mode
+     */
+    synchronized void setPurgeMode(final boolean purgeMode) {
+        this.purgeMode = purgeMode;
+    }
+
+    File getFile(final String fileName) {
+        return new File(directory, fileName);
+    }
+
+    private void deleteOldFiles() {
+        int initialSize = toBeDeleted.size();
+        int count = 0;
+        for (String n : new ArrayList<String>(toBeDeleted)) {
+            if (tryDelete(n)) {
+                count++;
+            }
+        }
+        LOG.info("deleted [" + count + "]/[" + initialSize + "] files");
+    }
+
+    /**
+     * This method tries to delete a file. If it is not able to delete file due
+     * to any reason, it add it toBeDeleted list.
+     * 
+     * @param fileName name of the file which will be deleted.
+     * @return true if this method deletes file successfuly else return false.
+     */
+    boolean tryDelete(final String fileName) {
+        log("cache delete " + fileName);
+        File f = getFile(fileName);
+        if (f.exists() && f.delete()) {
+            log(fileName + "  deleted successfully");
+            toBeDeleted.remove(fileName);
+            while (true) {
+                f = f.getParentFile();
+                if (f.equals(directory) || f.list().length > 0) {
+                    break;
+                }
+                // delete empty parent folders (except the main directory)
+                f.delete();
+            }
+            return true;
+        } else if (f.exists()) {
+            LOG.info("not able to delete file = " + f.getAbsolutePath());
+            toBeDeleted.add(fileName);
+            return false;
+        }
+        return true;
+    }
+
+    static int maxSizeElements(final long bytes) {
+        // after a CQ installation, the average item in
+        // the data store is about 52 KB
+        int count = (int) (bytes / 65535);
+        count = Math.max(1024, count);
+        count = Math.min(64 * 1024, count);
+        return count;
+    }
+
+    static void log(final String s) {
+        LOG.debug(s);
+    }
+
+    /**
+     * A LRU based extension {@link LinkedHashMap}. The key is file name and
+     * value is length of file.
+     */
+    private class LRUCache extends LinkedHashMap<String, Long> {
+        private static final long serialVersionUID = 1L;
+
+        volatile long currentSizeInBytes;
+
+        final long maxSizeInBytes;
+
+        long cachePurgeResize;
+        
+        private long cachePurgeTrigSize;
+
+        public LRUCache(final long maxSizeInBytes,
+                final double cachePurgeTrigFactor,
+                final double cachePurgeResizeFactor) {
+            super(maxSizeElements(maxSizeInBytes), (float) 0.75, true);
+            this.maxSizeInBytes = maxSizeInBytes;
+            this.cachePurgeTrigSize = new Double(cachePurgeTrigFactor
+                * maxSizeInBytes).longValue();
+            this.cachePurgeResize = new Double(cachePurgeResizeFactor
+                * maxSizeInBytes).longValue();
+        }
+
+        /**
+         * Overridden {@link Map#remove(Object)} to delete corresponding file
+         * from file system.
+         */
+        @Override
+        public synchronized Long remove(final Object key) {
+            String fileName = (String) key;
+            fileName = fileName.replace("\\", "/");
+            Long flength = null;
+            if (tryDelete(fileName)) {
+                flength = super.remove(key);
+                if (flength != null) {
+                    log("cache entry { " + fileName + "} with size {" + flength
+                        + "} removed.");
+                    currentSizeInBytes -= flength.longValue();
+                }
+            } else if (!getFile(fileName).exists()) {
+                // second attempt. remove from cache if file doesn't exists
+                flength = super.remove(key);
+                if (flength != null) {
+                    log(" file not exists. cache entry { " + fileName
+                        + "} with size {" + flength + "} removed.");
+                    currentSizeInBytes -= flength.longValue();
+                }
+            }
+            return flength;
+        }
+
+        @Override
+        public synchronized Long put(final String key, final Long value) {
+            long flength = value.longValue();
+            currentSizeInBytes += flength;
+            return super.put(key.replace("\\", "/"), value);
+        }
+
+        /**
+         * This method tries purging of local cache. It checks if local cache
+         * has exceeded the defined limit then it triggers purge cache job in a
+         * seperate thread.
+         */
+        synchronized void tryPurge() {
+            if (currentSizeInBytes > cachePurgeTrigSize && !isInPurgeMode()) {
+                setPurgeMode(true);
+                LOG.info("currentSizeInBytes[" + cache.currentSizeInBytes
+                    + "] exceeds (cachePurgeTrigSize)["
+                    + cache.cachePurgeTrigSize + "]");
+                new Thread(new PurgeJob()).start();
+            }
+        }
+        /**
+         * This method check if cache can admit file of given length. 
+         * @param length length of file.
+         * @return true if cache size + length is less than maxSize.
+         */
+        synchronized boolean canAdmitFile(final long length) {
+            return cache.currentSizeInBytes + length < cache.maxSizeInBytes;
+        }
+    }
+
+    /**
+     * This class performs purging of local cache. It implements
+     * {@link Runnable} and should be invoked in a separate thread.
+     */
+    private class PurgeJob implements Runnable {
+        public PurgeJob() {
+            // TODO Auto-generated constructor stub
+        }
+
+        /**
+         * This method purges local cache till its size is less than
+         * cacheResizefactor * maxSize
+         */
+        @Override
+        public void run() {
+            try {
+                synchronized (cache) {
+                    LOG.info(" cache purge job started");
+                    // first try to delete toBeDeleted files
+                    int initialSize = cache.size();
+                    for (String fileName : new ArrayList<String>(toBeDeleted)) {
+                        cache.remove(fileName);
+                    }
+                    Iterator<Map.Entry<String, Long>> itr = cache.entrySet().iterator();
+                    while (itr.hasNext()) {
+                        Map.Entry<String, Long> entry = itr.next();
+                        if (entry.getKey() != null) {
+                            if (cache.currentSizeInBytes > cache.cachePurgeResize) {
+                                itr.remove();
+
+                            } else {
+                                break;
+                            }
+                        }
+
+                    }
+                    LOG.info(" cache purge job completed: cleaned ["
+                        + (initialSize - cache.size())
+                        + "] files and currentSizeInBytes = [ "
+                        + cache.currentSizeInBytes + "]");
+                }
+            } catch (Exception e) {
+                LOG.error("error in purge jobs:", e);
+            } finally {
+                setPurgeMode(false);
+            }
+        }
+    }
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext;
+
+/**
+ * Defined Amazon S3 constants.
+ */
+public final class S3Constants {
+    
+    /**
+     * Amazon aws access key.
+     */
+    public static final String ACCESS_KEY = "accessKey";
+
+    /**
+     * Amazon aws secret key.
+     */
+    public static final String SECRET_KEY = "secretKey";
+
+    /**
+     * Amazon aws S3 bucket.
+     */
+    public static final String S3_BUCKET = "s3Bucket";
+
+    /**
+     * Amazon aws S3 region.
+     */
+    public static final String S3_REGION = "s3Region";
+   
+    /**
+     * private constructor so that class cannot initialized from outside.
+     */
+    private S3Constants() {
+
+    }
+
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+/**
+ * Amazon S3 utilities.
+ */
+public final class Utils {
+
+    public static final String DEFAULT_CONFIG_FILE = "aws.properties";
+    
+    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+    private static final String DELETE_CONFIG_SUFFIX = ";burn";
+
+    /**
+     * private constructor so that class cannot initialized from outside.
+     */
+    private Utils() {
+
+    }
+
+    /**
+     * Create AmazonS3Client from properties.
+     * 
+     * @param prop properties to configure @link {@link AmazonS3Client}
+     * @return {@link AmazonS3Client}
+     */
+    public static AmazonS3Client openService(final Properties prop) {
+        AWSCredentials credentials = new BasicAWSCredentials(
+            prop.getProperty(S3Constants.ACCESS_KEY),
+            prop.getProperty(S3Constants.SECRET_KEY));
+        int connectionTimeOut = Integer.parseInt(prop.getProperty("connectionTimeout"));
+        int socketTimeOut = Integer.parseInt(prop.getProperty("socketTimeout"));
+        int maxConnections = Integer.parseInt(prop.getProperty("maxConnections"));
+        int maxErrorRetry = Integer.parseInt(prop.getProperty("maxErrorRetry"));
+        ClientConfiguration cc = new ClientConfiguration();
+        cc.setConnectionTimeout(connectionTimeOut);
+        cc.setSocketTimeout(socketTimeOut);
+        cc.setMaxConnections(maxConnections);
+        cc.setMaxErrorRetry(maxErrorRetry);
+        return new AmazonS3Client(credentials, cc);
+    }
+
+    /**
+     * Delete S3 bucket. This method first deletes all objects from bucket and
+     * then delete empty bucket.
+     * 
+     * @param prop properties to configure @link {@link AmazonS3Client} and
+     * delete bucket.
+     */
+    public static void deleteBucket(final Properties prop) throws IOException {
+        AmazonS3Client s3service = openService(prop);
+        String bucketName = prop.getProperty(S3Constants.S3_BUCKET);
+        if (!s3service.doesBucketExist(bucketName)) {
+            return;
+        }
+        ObjectListing prevObjectListing = s3service.listObjects(bucketName);
+        while (true) {
+            List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+            for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                deleteList.add(new DeleteObjectsRequest.KeyVersion(
+                    s3ObjSumm.getKey()));
+            }
+            if (deleteList.size() > 0) {
+                DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
+                    bucketName);
+                delObjsReq.setKeys(deleteList);
+                DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+                if (dobjs.getDeletedObjects().size() != deleteList.size()) {
+                    throw new IOException(
+                        "Incomplete delete object request. only  "
+                            + dobjs.getDeletedObjects().size() + " out of "
+                            + deleteList.size() + " are deleted");
+                }
+                LOG.info(deleteList.size()
+                        + " records deleted from datastore");
+            }
+            if (!prevObjectListing.isTruncated()) {
+                break;
+            }
+            prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+        }
+        s3service.deleteBucket(bucketName);
+        s3service.shutdown();
+    }
+
+    /**
+     * Read a configuration properties file. If the file name ends with ";burn",
+     * the file is deleted after reading.
+     * 
+     * @param fileName the properties file name
+     * @return the properties
+     * @throws IOException if the file doesn't exist
+     */
+    public static Properties readConfig(String fileName) throws IOException {
+        boolean delete = false;
+        if (fileName.endsWith(DELETE_CONFIG_SUFFIX)) {
+            delete = true;
+            fileName = fileName.substring(0, fileName.length()
+                - DELETE_CONFIG_SUFFIX.length());
+        }
+        if (!new File(fileName).exists()) {
+            throw new IOException("Config file not found: " + fileName);
+        }
+        Properties prop = new Properties();
+        InputStream in = null;
+        try {
+            in = new FileInputStream(fileName);
+            prop.load(in);
+        } finally {
+            if (in != null) {
+                in.close();
+            }
+            if (delete) {
+                deleteIfPossible(new File(fileName));
+            }
+        }
+        return prop;
+    }
+
+    private static void deleteIfPossible(final File file) {
+        boolean deleted = file.delete();
+        if (!deleted) {
+            LOG.warn("Could not delete " + file.getAbsolutePath());
+        }
+    }
+
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ * The interface defines the backend which can be plugged into
+ * {@link CachingDataStore}.
+ */
+public interface Backend {
+
+    /**
+     * This method initialize backend with the configuration.
+     * 
+     * @param store {@link CachingDataStore}
+     * @param homeDir path of repository home dir.
+     * @param config path of config property file.
+     * @throws DataStoreException
+     */
+    void init(CachingDataStore store, String homeDir, String config)
+            throws DataStoreException;
+
+    /**
+     * Return inputstream of record identified by identifier.
+     * 
+     * @param identifier identifier of record.
+     * @return inputstream of the record.
+     * @throws DataStoreException if record not found or any error.
+     */
+    InputStream read(DataIdentifier identifier) throws DataStoreException;
+
+    /**
+     * Return length of record identified by identifier.
+     * 
+     * @param identifier identifier of record.
+     * @return length of the record.
+     * @throws DataStoreException if record not found or any error.
+     */
+    long getLength(DataIdentifier identifier) throws DataStoreException;
+
+    /**
+     * Return lastModified of record identified by identifier.
+     * 
+     * @param identifier identifier of record.
+     * @return lastModified of the record.
+     * @throws DataStoreException if record not found or any error.
+     */
+    long getLastModified(DataIdentifier identifier) throws DataStoreException;
+
+    /**
+     * Stores file to backend with identifier used as key. If key pre-exists, it
+     * updates the timestamp of the key.
+     * 
+     * @param identifier key of the file 
+     * @param file file that would be stored in backend.
+     * @throws DataStoreException for any error.
+     */
+    void write(DataIdentifier identifier, File file) throws DataStoreException;
+
+    /**
+     * Returns identifiers of all records that exists in backend. 
+     * @return iterator consisting of all identifiers
+     * @throws DataStoreException
+     */
+    Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException;
+
+    /**
+     * Update timestamp of record identified by identifier if minModifiedDate is
+     * greater than record's lastModified else no op.
+     * 
+     * @throws DataStoreException if record not found.
+     */
+    void touch(DataIdentifier identifier, long minModifiedDate)
+            throws DataStoreException;
+    /**
+     * This method check the existence of record in backend. 
+     * @param identifier identifier to be checked. 
+     * @return true if records exists else false.
+     * @throws DataStoreException
+     */
+    boolean exists(DataIdentifier identifier) throws DataStoreException;
+
+    /**
+     * Close backend and release resources like database connection if any.
+     * @throws DataStoreException
+     */
+    void close() throws DataStoreException;
+
+    /**
+     * Delete all records which are older than timestamp.
+     * @param timestamp
+     * @return list of identifiers which are deleted. 
+     * @throws DataStoreException
+     */
+    List<DataIdentifier> deleteAllOlderThan(long timestamp) throws DataStoreException;
+
+    /**
+     * Delete record identified by identifier. No-op if identifier not found.
+     * @param identifier
+     * @throws DataStoreException
+     */
+    void deleteRecord(DataIdentifier identifier) throws DataStoreException;
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.InputStream;
+
+import org.apache.jackrabbit.core.data.AbstractDataRecord;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ * CachingDataRecord which stores reference to {@link CachingDataStore}. This
+ * class doesn't store any references to attributes but attributes are fetched
+ * on demand from {@link CachingDataStore}.
+ */
+public class CachingDataRecord extends AbstractDataRecord {
+
+    private final CachingDataStore store;
+
+    public CachingDataRecord(CachingDataStore store, DataIdentifier identifier) {
+        super(store, identifier);
+        this.store = store;
+    }
+
+    @Override
+    public long getLastModified() {
+        try {
+            return store.getLastModified(getIdentifier());
+        } catch (DataStoreException dse) {
+            return 0;
+        }
+    }
+
+    @Override
+    public long getLength() throws DataStoreException {
+        return store.getLength(getIdentifier());
+    }
+
+    @Override
+    public InputStream getStream() throws DataStoreException {
+        return store.getStream(getIdentifier());
+    }
+
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.lang.ref.WeakReference;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.aws.ext.LocalCache;
+import org.apache.jackrabbit.core.data.AbstractDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.MultiDataStoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A caching data store that consists of {@link LocalCache} and {@link Backend}.
+ * {@link Backend} is single source of truth. All methods first try to fetch
+ * information from {@link LocalCache}. If record is not available in
+ * {@link LocalCache}, then it is fetched from {@link Backend} and saved to
+ * {@link LocalCache} for further access. This class is designed to work without
+ * {@link LocalCache} and then all information is fetched from {@link Backend}.
+ * To disable {@link LocalCache} set {@link #setCacheSize(long)} to 0. *
+ * Configuration:
+ * 
+ * <pre>
+ * &lt;DataStore class="org.apache.jackrabbit.aws.ext.ds.CachingDataStore">
+ * 
+ *     &lt;param name="{@link #setPath(String) path}" value="/data/datastore"/>
+ *     &lt;param name="{@link #setConfig(String) config}" value="${rep.home}/backend.properties"/>
+ *     &lt;param name="{@link #setCacheSize(long) cacheSize}" value="68719476736"/>
+ *     &lt;param name="{@link #setSecret(String) secret}" value="123456"/>
+ *     &lt;param name="{@link #setCachePurgeTrigFactor(double)}" value="0.95d"/>
+ *     &lt;param name="{@link #setCachePurgeResizeFactor(double) cacheSize}" value="0.85d"/>
+ *     &lt;param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/>
+ * &lt/DataStore>
+ */
+public abstract class CachingDataStore extends AbstractDataStore implements
+        MultiDataStoreAware {
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(CachingDataStore.class);
+
+    /**
+     * The digest algorithm used to uniquely identify records.
+     */
+    private static final String DIGEST = "SHA-1";
+
+    private static final String DS_STORE = ".DS_Store";
+    
+    /**
+     * Name of the directory used for temporary files. Must be at least 3
+     * characters.
+     */
+    private static final String TMP = "tmp";
+    
+    /**
+     * All data identifiers that are currently in use are in this set until they
+     * are garbage collected.
+     */
+    protected Map<DataIdentifier, WeakReference<DataIdentifier>> inUse = 
+            Collections.synchronizedMap(new WeakHashMap<DataIdentifier, 
+                WeakReference<DataIdentifier>>());
+
+    protected Backend backend;
+
+    /**
+     * The minimum size of an object that should be stored in this data store.
+     */
+    private int minRecordLength = 16 * 1024;
+
+    private String path;
+
+    private File directory;
+
+    private File tmpDir;
+
+    private String secret;
+
+    /**
+     * The optional backend configuration.
+     */
+    private String config;
+
+    /**
+     * The minimum modified date. If a file is accessed (read or write) with a
+     * modified date older than this value, the modified date is updated to the
+     * current time.
+     */
+    private long minModifiedDate;
+
+    /**
+     * Cache purge trigger factor. Cache will undergo in auto-purge mode if
+     * cache current size is greater than cachePurgeTrigFactor * cacheSize
+     */
+    private double cachePurgeTrigFactor = 0.95d;
+
+    /**
+     * Cache resize factor. After auto-purge mode, cache current size would just
+     * greater than cachePurgeResizeFactor * cacheSize cacheSize
+     */
+    private double cachePurgeResizeFactor = 0.85d;
+
+    /**
+     * The number of bytes in the cache. The default value is 64 GB.
+     */
+    private long cacheSize = 64L * 1024 * 1024 * 1024;
+
+    /**
+     * The local file system cache.
+     */
+    private LocalCache cache;
+
+    abstract Backend createBackend();
+
+    abstract String getMarkerFile();
+
+    /**
+     * Initialized the data store. If the path is not set, &lt;repository
+     * home&gt;/repository/datastore is used. This directory is automatically
+     * created if it does not yet exist. During first initialization, it upload
+     * all files from local datastore to backed and local datastore act as a
+     * local cache.
+     */
+    @Override
+    public void init(String homeDir) throws RepositoryException {
+        if (path == null) {
+            path = homeDir + "/repository/datastore";
+        }
+        directory = new File(path);
+        try {
+            mkdirs(directory);
+        } catch (IOException e) {
+            throw new DataStoreException("Could not create directory "
+                + directory.getAbsolutePath(), e);
+        }
+        tmpDir = new File(homeDir, "/repository/s3tmp");
+        try {
+            if (!mkdirs(tmpDir)) {
+                FileUtils.cleanDirectory(tmpDir);
+                LOG.info("tmp = " + tmpDir.getPath() + " cleaned");
+            }
+        } catch (IOException e) {
+            throw new DataStoreException("Could not create directory "
+                + tmpDir.getAbsolutePath(), e);
+        }
+        LOG.info("cachePurgeTrigFactor = " + cachePurgeTrigFactor
+            + ", cachePurgeResizeFactor = " + cachePurgeResizeFactor);
+        backend = createBackend();
+        backend.init(this, path, config);
+        String markerFileName = getMarkerFile();
+        if (markerFileName != null) {
+            // create marker file in homeDir to avoid deletion in cache cleanup.
+            File markerFile = new File(homeDir, markerFileName);
+            if (!markerFile.exists()) {
+                LOG.info("load files from local cache");
+                loadFilesFromCache();
+                try {
+                    markerFile.createNewFile();
+                } catch (IOException e) {
+                    throw new DataStoreException(
+                        "Could not create marker file "
+                            + markerFile.getAbsolutePath(), e);
+                }
+            } else {
+                LOG.info("marker file = " + markerFile.getAbsolutePath()
+                    + " exists");
+            }
+        }
+        cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
+            cachePurgeTrigFactor, cachePurgeResizeFactor);
+    }
+
+    /**
+     * Creates a new data record in {@link Backend}. The stream is first
+     * consumed and the contents are saved in a temporary file and the SHA-1
+     * message digest of the stream is calculated. If a record with the same
+     * SHA-1 digest (and length) is found then it is returned. Otherwise new
+     * record is created in {@link Backend} and the temporary file is moved in
+     * place to {@link LocalCache}.
+     * 
+     * @param input binary stream
+     * @return {@link CachingDataRecord}
+     * @throws DataStoreException if the record could not be created.
+     */
+    @Override
+    public DataRecord addRecord(InputStream input) throws DataStoreException {
+        File temporary = null;
+        try {
+            temporary = newTemporaryFile();
+            DataIdentifier tempId = new DataIdentifier(temporary.getName());
+            usesIdentifier(tempId);
+            // Copy the stream to the temporary file and calculate the
+            // stream length and the message digest of the stream
+            MessageDigest digest = MessageDigest.getInstance(DIGEST);
+            OutputStream output = new DigestOutputStream(new FileOutputStream(
+                temporary), digest);
+            try {
+                IOUtils.copyLarge(input, output);
+            } finally {
+                output.close();
+            }
+            DataIdentifier identifier = new DataIdentifier(
+                encodeHexString(digest.digest()));
+            synchronized (this) {
+                usesIdentifier(identifier);
+                backend.write(identifier, temporary);
+                String fileName = getFileName(identifier);
+                cache.store(fileName, temporary);
+            }
+            // this will also make sure that
+            // tempId is not garbage collected until here
+            inUse.remove(tempId);
+            return new CachingDataRecord(this, identifier);
+        } catch (NoSuchAlgorithmException e) {
+            throw new DataStoreException(DIGEST + " not available", e);
+        } catch (IOException e) {
+            throw new DataStoreException("Could not add record", e);
+        } finally {
+            if (temporary != null) {
+                // try to delete - but it's not a big deal if we can't
+                temporary.delete();
+            }
+        }
+    }
+
+    /**
+     * Get a data record for the given identifier or null it data record doesn't
+     * exist in {@link Backend}
+     * 
+     * @param identifier identifier of record.
+     * @return the {@link CachingDataRecord} or null.
+     */
+    @Override
+    public DataRecord getRecordIfStored(DataIdentifier identifier)
+            throws DataStoreException {
+        synchronized (this) {
+            usesIdentifier(identifier);
+            if (!backend.exists(identifier)) {
+                return null;
+            }
+            backend.touch(identifier, minModifiedDate);
+            return new CachingDataRecord(this, identifier);
+        }
+    }
+
+    @Override
+    public void updateModifiedDateOnAccess(long before) {
+        LOG.info("minModifiedDate set to: " + before);
+        minModifiedDate = before;
+    }
+    /**
+     * Retrieves all identifiers from {@link Backend}.
+     */
+    @Override
+    public Iterator<DataIdentifier> getAllIdentifiers()
+            throws DataStoreException {
+        return backend.getAllIdentifiers();
+    }
+
+    /**
+     * This method deletes record from {@link Backend} and then from
+     * {@link LocalCache}
+     */
+    @Override
+    public void deleteRecord(DataIdentifier identifier)
+            throws DataStoreException {
+        String fileName = getFileName(identifier);
+        synchronized (this) {
+            backend.deleteRecord(identifier);
+            cache.delete(fileName);
+        }
+    }
+
+    @Override
+    public synchronized int deleteAllOlderThan(long min)
+            throws DataStoreException {
+        List<DataIdentifier> diList = backend.deleteAllOlderThan(min);
+        // remove entries from local cache
+        for (DataIdentifier identifier : diList) {
+            cache.delete(getFileName(identifier));
+        }
+        return diList.size();
+    }
+
+    /**
+     * Get stream of record from {@link LocalCache}. If record is not available
+     * in {@link LocalCache}, this method fetches record from {@link Backend}
+     * and stores it to {@link LocalCache}. Stream is then returned from cached
+     * record.
+     */
+    InputStream getStream(DataIdentifier identifier) throws DataStoreException {
+        InputStream in = null;
+        try {
+            String fileName = getFileName(identifier);
+            InputStream cached = cache.getIfStored(fileName);
+            if (cached != null) {
+                return cached;
+            }
+            in = backend.read(identifier);
+            return cache.store(fileName, in);
+        } catch (IOException e) {
+            throw new DataStoreException("IO Exception: " + identifier, e);
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+    }
+
+    /**
+     * Return lastModified of record from {@link Backend} assuming
+     * {@link Backend} as a single source of truth.
+     */
+    long getLastModified(DataIdentifier identifier) throws DataStoreException {
+        LOG.info("accessed lastModified");
+        return backend.getLastModified(identifier);
+    }
+
+    /**
+     * Return the length of record from {@link LocalCache} if available,
+     * otherwise retrieve it from {@link Backend}.
+     */
+    long getLength(DataIdentifier identifier) throws DataStoreException {
+        String fileName = getFileName(identifier);
+        Long length = cache.getFileLength(fileName);
+        if (length != null) {
+            return length.longValue();
+        }
+        return backend.getLength(identifier);
+    }
+
+    @Override
+    protected byte[] getOrCreateReferenceKey() throws DataStoreException {
+        try {
+            return secret.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new DataStoreException(e);
+        }
+    }
+
+    /**
+     * Returns a unique temporary file to be used for creating a new data
+     * record.
+     */
+    private File newTemporaryFile() throws IOException {
+        return File.createTempFile(TMP, null, tmpDir);
+    }
+
+    /**
+     * Load files from {@link LocalCache} to {@link Backend}.
+     */
+    private void loadFilesFromCache() throws RepositoryException {
+        ArrayList<File> files = new ArrayList<File>();
+        listRecursive(files, directory);
+        long totalSize = 0;
+        for (File f : files) {
+            totalSize += f.length();
+        }
+        long currentSize = 0;
+        long time = System.currentTimeMillis();
+        for (File f : files) {
+            long now = System.currentTimeMillis();
+            if (now > time + 5000) {
+                LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+                time = now;
+            }
+            currentSize += f.length();
+            String name = f.getName();
+            LOG.debug("upload file = " + name);
+            if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
+                && f.length() > 0) {
+                loadFileToBackEnd(f);
+            }
+        }
+        LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+    }
+
+    /**
+     * Traverse recursively and populate list with files.
+     */
+    private void listRecursive(List<File> list, File file) {
+        File[] files = file.listFiles();
+        if (files != null) {
+            for (File f : files) {
+                if (f.isDirectory()) {
+                    listRecursive(list, f);
+                } else {
+                    list.add(f);
+                }
+            }
+        }
+    }
+    /**
+     * Upload file from {@link LocalCache} to {@link Backend}. 
+     * @param f file to uploaded.
+     * @throws DataStoreException
+     */
+    private void loadFileToBackEnd(File f) throws DataStoreException {
+        DataIdentifier identifier = new DataIdentifier(f.getName());
+        usesIdentifier(identifier);
+        backend.write(identifier, f);
+        LOG.debug(f.getName() + "uploaded.");
+
+    }
+
+    /**
+     * Derive file name from identifier.
+     */
+    private static String getFileName(DataIdentifier identifier) {
+        String name = identifier.toString();
+        name = name.substring(0, 2) + "/" + name.substring(2, 4) + "/"
+            + name.substring(4, 6) + "/" + name;
+        return name;
+    }
+
+    private void usesIdentifier(DataIdentifier identifier) {
+        inUse.put(identifier, new WeakReference<DataIdentifier>(identifier));
+    }
+
+    private static boolean mkdirs(File dir) throws IOException {
+        if (dir.exists()) {
+            if (dir.isFile()) {
+                throw new IOException("Can not create a directory "
+                    + "because a file exists with the same name: "
+                    + dir.getAbsolutePath());
+            }
+            return false;
+        }
+        boolean created = dir.mkdirs();
+        if (!created) {
+            throw new IOException("Could not create directory: "
+                + dir.getAbsolutePath());
+        }
+        return created;
+    }
+
+    @Override
+    public void clearInUse() {
+        inUse.clear();
+    }
+
+    @Override
+    public void close() throws DataStoreException {
+        cache.close();
+        backend.close();
+        cache = null;
+    }
+
+    /**
+     * Setter for configuration based secret
+     * 
+     * @param secret the secret used to sign reference binaries
+     */
+    public void setSecret(String secret) {
+        this.secret = secret;
+    }
+
+    /**
+     * Set the minimum object length.
+     * 
+     * @param minRecordLength the length
+     */
+    public void setMinRecordLength(int minRecordLength) {
+        this.minRecordLength = minRecordLength;
+    }
+
+    /**
+     * Return mininum object length.
+     */
+    @Override
+    public int getMinRecordLength() {
+        return minRecordLength;
+    }
+
+    /**
+     * Return path of configuration properties.
+     * 
+     * @return path of configuration properties.
+     */
+    public String getConfig() {
+        return config;
+    }
+
+    /**
+     * Set the configuration properties path.
+     * 
+     * @param config path of configuration properties.
+     */
+    public void setConfig(String config) {
+        this.config = config;
+    }
+    /**
+     * @return  size of {@link LocalCache}. 
+     */
+    public long getCacheSize() {
+        return cacheSize;
+    }
+    /**
+     * Set size of {@link LocalCache}.
+     * @param cacheSize size of {@link LocalCache}.  
+     */
+    public void setCacheSize(long cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+    /**
+     * 
+     * @return path of {@link LocalCache}.
+     */
+    public String getPath() {
+        return path;
+    }
+    /**
+     * Set path of {@link LocalCache}.
+     * @param path of {@link LocalCache}.
+     */
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    /**
+     * @return Purge trigger factor of {@link LocalCache}.
+     */
+    public double getCachePurgeTrigFactor() {
+        return cachePurgeTrigFactor;
+    }
+
+    /**
+     * Set purge trigger factor of {@link LocalCache}.
+     * @param cachePurgeTrigFactor purge trigger factor.
+     */
+    public void setCachePurgeTrigFactor(double cachePurgeTrigFactor) {
+        this.cachePurgeTrigFactor = cachePurgeTrigFactor;
+    }
+
+    /**
+     * @return Purge resize factor of {@link LocalCache}.
+     */
+    public double getCachePurgeResizeFactor() {
+        return cachePurgeResizeFactor;
+    }
+
+    /**
+     * Set purge resize factor of {@link LocalCache}.
+     * @param cachePurgeResizeFactor purge resize factor.
+     */
+    public void setCachePurgeResizeFactor(double cachePurgeResizeFactor) {
+        this.cachePurgeResizeFactor = cachePurgeResizeFactor;
+    }
+
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.jackrabbit.aws.ext.S3Constants;
+import org.apache.jackrabbit.aws.ext.Utils;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+
+/**
+ * A data store backend that stores data on Amazon S3.
+ */
+public class S3Backend implements Backend {
+
+    private static final String KEY_PREFIX = "dataStore_";
+
+    /**
+     * The default value AWS bucket region.
+     */
+    private static final String DEFAULT_AWS_BUCKET_REGION = "us-standard";
+
+    /**
+     * constants to define endpoint to various AWS region
+     */
+    private static final String AWSDOTCOM = "amazonaws.com";
+
+    private static final String S3 = "s3";
+
+    private static final String DOT = ".";
+
+    private static final String DASH = "-";
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(S3Backend.class);
+
+    private AmazonS3Client s3service;
+
+    private String bucket;
+
+    private TransferManager tmx;
+
+    private CachingDataStore store;
+
+    /**
+     * Initialize S3Backend. It creates AmazonS3Client and TransferManager from
+     * aws.properties. It creates S3 bucket if it doesn't pre-exist in S3.
+     */
+    @Override
+    public void init(CachingDataStore store, String homeDir, String config)
+            throws DataStoreException {
+        if (config == null) {
+            config = Utils.DEFAULT_CONFIG_FILE;
+        }
+        try {
+            Properties prop = Utils.readConfig(config);
+            LOG.debug("init");
+            this.store = store;
+            s3service = Utils.openService(prop);
+            bucket = prop.getProperty(S3Constants.S3_BUCKET);
+            String region = prop.getProperty(S3Constants.S3_REGION);
+            String endpoint = null;
+            if (!s3service.doesBucketExist(bucket)) {
+
+                if (DEFAULT_AWS_BUCKET_REGION.equals(region)) {
+                    s3service.createBucket(bucket, Region.US_Standard);
+                    endpoint = S3 + DOT + AWSDOTCOM;
+                } else if (Region.EU_Ireland.toString().equals(region)) {
+                    s3service.createBucket(bucket, Region.EU_Ireland);
+                    endpoint = "s3-eu-west-1" + DOT + AWSDOTCOM;
+                } else {
+                    s3service.createBucket(bucket, region);
+                    endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
+                }
+                LOG.info("Created bucket: " + bucket + " in " + region);
+            } else {
+                LOG.info("Using bucket: " + bucket);
+                if (DEFAULT_AWS_BUCKET_REGION.equals(region)) {
+                    endpoint = S3 + DOT + AWSDOTCOM;
+                } else if (Region.EU_Ireland.toString().equals(region)) {
+                    endpoint = "s3-eu-west-1" + DOT + AWSDOTCOM;
+                } else {
+                    endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
+                }
+            }
+            /*
+             * setting endpoint to remove latency of redirection. If endpoint is
+             * not set, invocation first goes us standard region, which
+             * redirects it to correct location.
+             */
+            s3service.setEndpoint(endpoint);
+            LOG.info("S3 service endpoint: " + endpoint);
+            tmx = new TransferManager(s3service, createDefaultExecutorService());
+            LOG.debug("  done");
+        } catch (Exception e) {
+            LOG.debug("  error ", e);
+            throw new DataStoreException("Could not initialize S3 from "
+                + config, e);
+        }
+    }
+
+    /**
+     * It uploads file to Amazon S3. If file size is greater than 5MB, this
+     * method uses parallel concurrent connections to upload.
+     */
+    @Override
+    public void write(DataIdentifier identifier, File file)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        ObjectMetadata objectMetaData = null;
+        long start = System.currentTimeMillis();
+        LOG.debug("write {0} length {1}", identifier, file.length());
+        try {
+            // check if the same record already exists
+            try {
+                objectMetaData = s3service.getObjectMetadata(bucket, key);
+            } catch (AmazonServiceException ase) {
+                if (ase.getStatusCode() != 404) {
+                    throw ase;
+                }
+            }
+            if (objectMetaData != null) {
+                long l = objectMetaData.getContentLength();
+                if (l != file.length()) {
+                    throw new DataStoreException("Collision: " + key
+                        + " new length: " + file.length() + " old length: " + l);
+                }
+                LOG.debug(key + "   exists");
+                CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
+                    bucket, key);
+                copReq.setNewObjectMetadata(objectMetaData);
+                s3service.copyObject(copReq);
+                LOG.debug("lastModified of " + identifier.toString()
+                    + " updated successfully");
+                LOG.debug("   updated");
+            }
+        } catch (AmazonServiceException e) {
+            LOG.debug("   does not exist", e);
+            // not found - create it
+        }
+        if (objectMetaData == null) {
+            LOG.debug("   creating");
+            try {
+                // start multipart parallel upload using amazon sdk
+                Upload up = tmx.upload(new PutObjectRequest(bucket, key, file));
+                // wait for upload to finish
+                up.waitForUploadResult();
+                LOG.debug("   done");
+            } catch (Exception e2) {
+                LOG.debug("   could not upload", e2);
+                throw new DataStoreException("Could not upload " + key, e2);
+            }
+        }
+        LOG.debug("    ms: {0}", System.currentTimeMillis() - start);
+
+    }
+
+    /**
+     * Check if record identified by identifier exists in Amazon S3.
+     */
+    @Override
+    public boolean exists(DataIdentifier identifier) throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            LOG.debug("exists {0}", identifier);
+            ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket,
+                key);
+            if (objectMetaData != null) {
+                LOG.debug("  true");
+                return true;
+            }
+            return false;
+        } catch (AmazonServiceException e) {
+            if (e.getStatusCode() == 404) {
+                LOG.info("key [" + identifier.toString() + "] not found.");
+                return false;
+            }
+            throw new DataStoreException(
+                "Error occured to getObjectMetadata for key ["
+                    + identifier.toString() + "]", e);
+        }
+    }
+
+    @Override
+    public void touch(DataIdentifier identifier, long minModifiedDate)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            if (minModifiedDate != 0) {
+                ObjectMetadata objectMetaData = s3service.getObjectMetadata(
+                    bucket, key);
+                if (objectMetaData.getLastModified().getTime() < minModifiedDate) {
+                    CopyObjectRequest copReq = new CopyObjectRequest(bucket,
+                        key, bucket, key);
+                    copReq.setNewObjectMetadata(objectMetaData);
+                    s3service.copyObject(copReq);
+                    LOG.debug("lastModified of " + identifier.toString()
+                        + " updated successfully");
+                }
+            }
+        } catch (Exception e) {
+            throw new DataStoreException(
+                "An Exception occurred while trying to set the last modified date of record "
+                    + identifier.toString(), e);
+        }
+    }
+
+    @Override
+    public InputStream read(DataIdentifier identifier)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            LOG.debug("read {" + identifier + "}");
+            S3Object object = s3service.getObject(bucket, key);
+            InputStream in = object.getObjectContent();
+            LOG.debug("  return");
+            return in;
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException("Object not found: " + key, e);
+        }
+    }
+
+    @Override
+    public Iterator<DataIdentifier> getAllIdentifiers()
+            throws DataStoreException {
+        try {
+            LOG.debug("getAllIdentifiers");
+            Set<DataIdentifier> ids = new HashSet<DataIdentifier>();
+            ObjectListing prevObjectListing = s3service.listObjects(bucket,
+                KEY_PREFIX);
+            while (true) {
+                for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                    String id = getIdentifierName(s3ObjSumm.getKey());
+                    if (id != null) {
+                        ids.add(new DataIdentifier(id));
+                    }
+                }
+                if (!prevObjectListing.isTruncated()) {
+                    break;
+                }
+                prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+            }
+            LOG.debug("  return");
+            return ids.iterator();
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException("Could not list objects", e);
+        }
+    }
+
+    @Override
+    public long getLastModified(DataIdentifier identifier)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
+            return object.getLastModified().getTime();
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException(
+                "Could not getLastModified of dataIdentifier " + identifier, e);
+        }
+    }
+
+    @Override
+    public long getLength(DataIdentifier identifier) throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
+            return object.getContentLength();
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException("Could not length of dataIdentifier "
+                + identifier, e);
+        }
+    }
+
+    @Override
+    public void deleteRecord(DataIdentifier identifier)
+            throws DataStoreException {
+        String key = getKeyName(identifier);
+        try {
+            s3service.deleteObject(bucket, key);
+        } catch (AmazonServiceException e) {
+            throw new DataStoreException(
+                "Could not getLastModified of dataIdentifier " + identifier, e);
+        }
+    }
+
+    @Override
+    public List<DataIdentifier> deleteAllOlderThan(long min)
+            throws DataStoreException {
+        LOG.info("deleteAllOlderThan " + new Date(min));
+        List<DataIdentifier> diDeleteList = new ArrayList<DataIdentifier>(30);
+        ObjectListing prevObjectListing = s3service.listObjects(bucket);
+        while (true) {
+            List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+            for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                DataIdentifier identifier = new DataIdentifier(
+                    getIdentifierName(s3ObjSumm.getKey()));
+                if (!store.inUse.containsKey(identifier)
+                    && s3ObjSumm.getLastModified().getTime() < min) {
+                    LOG.info("add id :" + s3ObjSumm.getKey()
+                        + " to delete lists");
+                    deleteList.add(new DeleteObjectsRequest.KeyVersion(
+                        s3ObjSumm.getKey()));
+                    diDeleteList.add(new DataIdentifier(
+                        getIdentifierName(s3ObjSumm.getKey())));
+                }
+            }
+            if (deleteList.size() > 0) {
+                DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
+                    bucket);
+                delObjsReq.setKeys(deleteList);
+                DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+                if (dobjs.getDeletedObjects().size() != deleteList.size()) {
+                    throw new DataStoreException(
+                        "Incomplete delete object request. only  "
+                            + dobjs.getDeletedObjects().size() + " out of "
+                            + deleteList.size() + " are deleted");
+                }
+                LOG.info(deleteList.size() + " records deleted from datastore");
+            }
+            if (!prevObjectListing.isTruncated()) {
+                break;
+            }
+            prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+        }
+        LOG.info("deleteAllOlderThan  exit");
+        return diDeleteList;
+    }
+
+    @Override
+    public void close() {
+        s3service.shutdown();
+        s3service = null;
+        tmx = null;
+    }
+
+    /**
+     * Get key from data identifier. Object is stored with key in S3.
+     */
+    private static String getKeyName(DataIdentifier identifier) {
+        return KEY_PREFIX + identifier.toString();
+    }
+
+    /**
+     * Get data identifier from key.
+     */
+    private static String getIdentifierName(String key) {
+        if (!key.startsWith(KEY_PREFIX)) {
+            return null;
+        }
+        return key.substring(KEY_PREFIX.length());
+    }
+
+    /**
+     * Returns a new thread pool configured with the default settings.
+     * 
+     * @return A new thread pool configured with the default settings.
+     */
+    private ThreadPoolExecutor createDefaultExecutorService() {
+        ThreadFactory threadFactory = new ThreadFactory() {
+            private int threadCount = 1;
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r);
+                thread.setContextClassLoader(getClass().getClassLoader());
+                thread.setName("s3-transfer-manager-worker-" + threadCount++);
+                return thread;
+            }
+        };
+        return (ThreadPoolExecutor) Executors.newFixedThreadPool(10,
+            threadFactory);
+    }
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+/**
+ * An Amazon S3 data store.
+ */
+public class S3DataStore extends CachingDataStore {
+
+    @Override
+    protected Backend createBackend() {
+        return new S3Backend();
+    }
+
+    @Override
+    protected String getMarkerFile() {
+        return "s3.init.done";
+    }
+
+}

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java?rev=1521876&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java Wed Sep 11 14:35:02 2013
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.jackrabbit.aws.ext.ds.TestCaseBase;
+import org.apache.jackrabbit.aws.ext.ds.TestInMemDs;
+import org.apache.jackrabbit.aws.ext.ds.TestInMemDsCacheOff;
+import org.apache.jackrabbit.aws.ext.ds.TestS3Ds;
+import org.apache.jackrabbit.aws.ext.ds.TestS3DsCacheOff;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test suite that includes all test cases for the this module.
+ */
+public class TestAll extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestAll.class);
+
+    /**
+     * <code>TestAll</code> suite that executes all tests inside this module. To
+     * run test cases agains Amazon S3 pass AWS configuration properties file as
+     * system property -Dconfig=/opt/cq/aws.properties. Sample aws properties
+     * located at src/test/resources/aws.properties.
+     */
+    public static Test suite() {
+        TestSuite suite = new TestSuite("S3 tests");
+        suite.addTestSuite(TestLocalCache.class);
+        suite.addTestSuite(TestInMemDs.class);
+        suite.addTestSuite(TestInMemDsCacheOff.class);
+        String config = System.getProperty(TestCaseBase.CONFIG);
+        LOG.info("config= " + config);
+        if (config != null && !"".equals(config.trim())) {
+            suite.addTestSuite(TestS3Ds.class);
+            suite.addTestSuite(TestS3DsCacheOff.class);
+        }
+        return suite;
+    }
+}



Mime
View raw message