jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpfis...@apache.org
Subject svn commit: r1577127 [1/3] - in /jackrabbit/trunk: jackrabbit-aws-ext/ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/ jackrabbit-aws-ext/src/test/java/org/apache/jackra...
Date Thu, 13 Mar 2014 12:12:34 GMT
Author: dpfister
Date: Thu Mar 13 12:12:33 2014
New Revision: 1577127

URL: http://svn.apache.org/r1577127
Log:
JCR-3729 - S3 Datastore optimizations

Added:
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/S3TestDataStore.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCache.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCacheResult.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/test/
    jackrabbit/trunk/jackrabbit-data/src/test/java/
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestCaseBase.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestInMemDs.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestInMemDsCacheOff.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java   (with props)
    jackrabbit/trunk/jackrabbit-data/src/test/resources/
    jackrabbit/trunk/jackrabbit-data/src/test/resources/log4j.properties   (with props)
Removed:
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestLocalCache.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryBackend.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryDataStore.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestCaseBase.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDs.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java
Modified:
    jackrabbit/trunk/jackrabbit-aws-ext/pom.xml
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/aws.properties
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml
    jackrabbit/trunk/jackrabbit-core/pom.xml
    jackrabbit/trunk/jackrabbit-data/pom.xml
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java

Modified: jackrabbit/trunk/jackrabbit-aws-ext/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/pom.xml?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/pom.xml (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/pom.xml Thu Mar 13 12:12:33 2014
@@ -53,6 +53,12 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-data</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <version>1.7.5</version>
@@ -69,12 +75,6 @@
             <version>1.7.5</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.jackrabbit</groupId>
-            <artifactId>jackrabbit-core</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-        </dependency>
     </dependencies>
     <build>
         <plugins>

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java Thu Mar 13 12:12:33 2014
@@ -21,7 +21,7 @@ package org.apache.jackrabbit.aws.ext;
  * Defined Amazon S3 constants.
  */
 public final class S3Constants {
-    
+
     /**
      * Amazon aws access key.
      */
@@ -41,7 +41,22 @@ public final class S3Constants {
      * Amazon aws S3 region.
      */
     public static final String S3_REGION = "s3Region";
-   
+    
+    /**
+     * Amazon aws S3 region.
+     */
+    public static final String S3_END_POINT = "s3EndPoint";
+
+    /**
+     * Constant to rename keys
+     */
+    public static final String S3_RENAME_KEYS = "s3RenameKeys";
+
+    /**
+     * Constant to rename keys
+     */
+    public static final String S3_WRITE_THREADS = "writeThreads";
+
     /**
      * private constructor so that class cannot initialized from outside.
      */

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java Thu Mar 13 12:12:33 2014
@@ -21,8 +21,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
 
 import org.slf4j.Logger;
@@ -31,9 +29,8 @@ import org.slf4j.LoggerFactory;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 
@@ -42,10 +39,10 @@ import com.amazonaws.services.s3.model.S
  */
 public final class Utils {
 
-    public static final String DEFAULT_CONFIG_FILE = "aws.properties";
-    
     private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
 
+    public static final String DEFAULT_CONFIG_FILE = "aws.properties";
+
     private static final String DELETE_CONFIG_SUFFIX = ";burn";
 
     /**
@@ -81,35 +78,15 @@ public final class Utils {
      * Delete S3 bucket. This method first deletes all objects from bucket and
      * then delete empty bucket.
      * 
-     * @param prop properties to configure @link {@link AmazonS3Client} and
-     * delete bucket.
+     * @param bucketName the bucket name.
      */
-    public static void deleteBucket(final Properties prop) throws IOException {
-        AmazonS3Client s3service = openService(prop);
-        String bucketName = prop.getProperty(S3Constants.S3_BUCKET);
-        if (!s3service.doesBucketExist(bucketName)) {
-            return;
-        }
+    public static void deleteBucket(final String bucketName) throws IOException {
+        Properties prop = readConfig(DEFAULT_CONFIG_FILE);
+        AmazonS3 s3service = openService(prop);
         ObjectListing prevObjectListing = s3service.listObjects(bucketName);
         while (true) {
-            List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
             for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
-                deleteList.add(new DeleteObjectsRequest.KeyVersion(
-                    s3ObjSumm.getKey()));
-            }
-            if (deleteList.size() > 0) {
-                DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
-                    bucketName);
-                delObjsReq.setKeys(deleteList);
-                DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
-                if (dobjs.getDeletedObjects().size() != deleteList.size()) {
-                    throw new IOException(
-                        "Incomplete delete object request. only  "
-                            + dobjs.getDeletedObjects().size() + " out of "
-                            + deleteList.size() + " are deleted");
-                }
-                LOG.info(deleteList.size()
-                        + " records deleted from datastore");
+                s3service.deleteObject(bucketName, s3ObjSumm.getKey());
             }
             if (!prevObjectListing.isTruncated()) {
                 break;
@@ -117,7 +94,6 @@ public final class Utils {
             prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
         }
         s3service.deleteBucket(bucketName);
-        s3service.shutdown();
     }
 
     /**

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java Thu Mar 13 12:12:33 2014
@@ -20,22 +20,26 @@ package org.apache.jackrabbit.aws.ext.ds
 import java.io.File;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.aws.ext.S3Constants;
 import org.apache.jackrabbit.aws.ext.Utils;
+import org.apache.jackrabbit.core.data.AsyncUploadCallback;
 import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.CachingDataStore;
 import org.apache.jackrabbit.core.data.DataIdentifier;
 import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +50,8 @@ import com.amazonaws.services.s3.model.D
 import com.amazonaws.services.s3.model.DeleteObjectsResult;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.ProgressEvent;
+import com.amazonaws.services.s3.model.ProgressListener;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.Region;
 import com.amazonaws.services.s3.model.S3Object;
@@ -58,6 +64,11 @@ import com.amazonaws.services.s3.transfe
  */
 public class S3Backend implements Backend {
 
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(S3Backend.class);
+
     private static final String KEY_PREFIX = "dataStore_";
 
     /**
@@ -75,12 +86,7 @@ public class S3Backend implements Backen
     private static final String DOT = ".";
 
     private static final String DASH = "-";
-
-    /**
-     * Logger instance.
-     */
-    private static final Logger LOG = LoggerFactory.getLogger(S3Backend.class);
-
+    
     private AmazonS3Client s3service;
 
     private String bucket;
@@ -89,6 +95,10 @@ public class S3Backend implements Backen
 
     private CachingDataStore store;
 
+    private Properties prop;
+
+    private Date startTime;
+
     /**
      * Initialize S3Backend. It creates AmazonS3Client and TransferManager from
      * aws.properties. It creates S3 bucket if it doesn't pre-exist in S3.
@@ -99,12 +109,20 @@ public class S3Backend implements Backen
         if (config == null) {
             config = Utils.DEFAULT_CONFIG_FILE;
         }
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
-            Properties prop = Utils.readConfig(config);
-            LOG.debug("init");
+            startTime = new Date();
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            prop = Utils.readConfig(config);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("init");
+            }
             this.store = store;
             s3service = Utils.openService(prop);
-            bucket = prop.getProperty(S3Constants.S3_BUCKET);
+            if (bucket == null || "".equals(bucket.trim())) {
+                bucket = prop.getProperty(S3Constants.S3_BUCKET);
+            }
             String region = prop.getProperty(S3Constants.S3_REGION);
             String endpoint = null;
             if (!s3service.doesBucketExist(bucket)) {
@@ -130,6 +148,10 @@ public class S3Backend implements Backen
                     endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
                 }
             }
+            String propEndPoint = prop.getProperty(S3Constants.S3_END_POINT);
+            if (propEndPoint != null & !"".equals(propEndPoint)) {
+                endpoint = propEndPoint;
+            }
             /*
              * setting endpoint to remove latency of redirection. If endpoint is
              * not set, invocation first goes us standard region, which
@@ -137,12 +159,39 @@ public class S3Backend implements Backen
              */
             s3service.setEndpoint(endpoint);
             LOG.info("S3 service endpoint: " + endpoint);
-            tmx = new TransferManager(s3service, createDefaultExecutorService());
-            LOG.debug("  done");
+
+            int writeThreads = 10;
+            String writeThreadsStr = prop.getProperty(S3Constants.S3_WRITE_THREADS);
+            if (writeThreadsStr != null) {
+                writeThreads = Integer.parseInt(writeThreadsStr);
+            }
+            LOG.info("Using thread pool of [" + writeThreads
+                + "] threads in S3 transfer manager");
+            tmx = new TransferManager(s3service,
+                (ThreadPoolExecutor) Executors.newFixedThreadPool(writeThreads,
+                    new NamedThreadFactory("s3-transfer-manager-worker")));
+            String renameKeyProp = prop.getProperty(S3Constants.S3_RENAME_KEYS);
+            boolean renameKeyBool = (renameKeyProp == null || "".equals(renameKeyProp))
+                    ? true
+                    : Boolean.parseBoolean(renameKeyProp);
+            if (renameKeyBool) {
+                renameKeys();
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("S3 Backend initialized in ["
+                    + (System.currentTimeMillis() - startTime.getTime())
+                    + "] ms");
+            }
         } catch (Exception e) {
-            LOG.debug("  error ", e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("  error ", e);
+            }
             throw new DataStoreException("Could not initialize S3 from "
                 + config, e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
         }
     }
 
@@ -153,128 +202,147 @@ public class S3Backend implements Backen
     @Override
     public void write(DataIdentifier identifier, File file)
             throws DataStoreException {
-        String key = getKeyName(identifier);
-        ObjectMetadata objectMetaData = null;
-        long start = System.currentTimeMillis();
-        LOG.debug("write {0} length {1}", identifier, file.length());
-        try {
-            // check if the same record already exists
-            try {
-                objectMetaData = s3service.getObjectMetadata(bucket, key);
-            } catch (AmazonServiceException ase) {
-                if (ase.getStatusCode() != 404) {
-                    throw ase;
-                }
-            }
-            if (objectMetaData != null) {
-                long l = objectMetaData.getContentLength();
-                if (l != file.length()) {
-                    throw new DataStoreException("Collision: " + key
-                        + " new length: " + file.length() + " old length: " + l);
-                }
-                LOG.debug(key + "   exists");
-                CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
-                    bucket, key);
-                copReq.setNewObjectMetadata(objectMetaData);
-                s3service.copyObject(copReq);
-                LOG.debug("lastModified of " + identifier.toString()
-                    + " updated successfully");
-                LOG.debug("   updated");
-            }
-        } catch (AmazonServiceException e) {
-            LOG.debug("   does not exist", e);
-            // not found - create it
-        }
-        if (objectMetaData == null) {
-            LOG.debug("   creating");
-            try {
-                // start multipart parallel upload using amazon sdk
-                Upload up = tmx.upload(new PutObjectRequest(bucket, key, file));
-                // wait for upload to finish
-                up.waitForUploadResult();
-                LOG.debug("   done");
-            } catch (Exception e2) {
-                LOG.debug("   could not upload", e2);
-                throw new DataStoreException("Could not upload " + key, e2);
-            }
-        }
-        LOG.debug("    ms: {0}", System.currentTimeMillis() - start);
+        this.write(identifier, file, false, null);
 
     }
 
+    @Override
+    public void writeAsync(DataIdentifier identifier, File file,
+            AsyncUploadCallback callback) throws DataStoreException {
+        if (callback == null) {
+            throw new IllegalArgumentException(
+                "callback parameter cannot be null in asyncUpload");
+        }
+        Thread th = new Thread(new AsyncUploadJob(identifier, file, callback));
+        th.start();
+    }
+
     /**
      * Check if record identified by identifier exists in Amazon S3.
      */
     @Override
     public boolean exists(DataIdentifier identifier) throws DataStoreException {
+        long start = System.currentTimeMillis();
         String key = getKeyName(identifier);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
-            LOG.debug("exists {0}", identifier);
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
             ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket,
                 key);
             if (objectMetaData != null) {
-                LOG.debug("  true");
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("exists [" + identifier + "]: [true] took ["
+                        + (System.currentTimeMillis() - start) + "] ms");
+                }
                 return true;
             }
             return false;
         } catch (AmazonServiceException e) {
             if (e.getStatusCode() == 404) {
-                LOG.info("key [" + identifier.toString() + "] not found.");
+                LOG.info("exists [" + identifier + "]: [false] took ["
+                    + (System.currentTimeMillis() - start) + "] ms");
                 return false;
             }
             throw new DataStoreException(
                 "Error occured to getObjectMetadata for key ["
                     + identifier.toString() + "]", e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
         }
     }
 
     @Override
-    public void touch(DataIdentifier identifier, long minModifiedDate)
+    public boolean exists(DataIdentifier identifier, boolean touch)
             throws DataStoreException {
+        long start = System.currentTimeMillis();
         String key = getKeyName(identifier);
+        ObjectMetadata objectMetaData = null;
+        boolean retVal = false;
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
-            if (minModifiedDate != 0) {
-                ObjectMetadata objectMetaData = s3service.getObjectMetadata(
-                    bucket, key);
-                if (objectMetaData.getLastModified().getTime() < minModifiedDate) {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            objectMetaData = s3service.getObjectMetadata(bucket, key);
+            if (objectMetaData != null) {
+                retVal = true;
+                if (touch) {
                     CopyObjectRequest copReq = new CopyObjectRequest(bucket,
                         key, bucket, key);
                     copReq.setNewObjectMetadata(objectMetaData);
                     s3service.copyObject(copReq);
-                    LOG.debug("lastModified of " + identifier.toString()
-                        + " updated successfully");
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("[ " + identifier.toString()
+                            + "] touched took ["
+                            + (System.currentTimeMillis() - start) + "] ms");
+                    }
                 }
+            } else {
+                retVal = false;
+            }
+
+        } catch (AmazonServiceException e) {
+            if (e.getStatusCode() == 404) {
+                retVal = false;
+            } else {
+                throw new DataStoreException(
+                    "Error occured to find exists for key ["
+                        + identifier.toString() + "]", e);
             }
         } catch (Exception e) {
             throw new DataStoreException(
-                "An Exception occurred while trying to set the last modified date of record "
+                "Error occured to find exists for key  "
                     + identifier.toString(), e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
         }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("exists [" + identifier + "]: [" + retVal + "] took ["
+                + (System.currentTimeMillis() - start) + "] ms");
+        }
+        return retVal;
     }
 
     @Override
     public InputStream read(DataIdentifier identifier)
             throws DataStoreException {
+        long start = System.currentTimeMillis();
         String key = getKeyName(identifier);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
-            LOG.debug("read {" + identifier + "}");
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
             S3Object object = s3service.getObject(bucket, key);
             InputStream in = object.getObjectContent();
-            LOG.debug("  return");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[ " + identifier.toString() + "] read took ["
+                    + (System.currentTimeMillis() - start) + "] ms");
+            }
             return in;
         } catch (AmazonServiceException e) {
             throw new DataStoreException("Object not found: " + key, e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
         }
     }
 
     @Override
     public Iterator<DataIdentifier> getAllIdentifiers()
             throws DataStoreException {
+        long start = System.currentTimeMillis();
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
-            LOG.debug("getAllIdentifiers");
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
             Set<DataIdentifier> ids = new HashSet<DataIdentifier>();
-            ObjectListing prevObjectListing = s3service.listObjects(bucket,
-                KEY_PREFIX);
+            ObjectListing prevObjectListing = s3service.listObjects(bucket);
             while (true) {
                 for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
                     String id = getIdentifierName(s3ObjSumm.getKey());
@@ -282,140 +350,480 @@ public class S3Backend implements Backen
                         ids.add(new DataIdentifier(id));
                     }
                 }
-                if (!prevObjectListing.isTruncated()) {
-                    break;
-                }
+                if (!prevObjectListing.isTruncated()) break;
                 prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
             }
-            LOG.debug("  return");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("getAllIdentifiers returned size [ " + ids.size()
+                    + "] took [" + (System.currentTimeMillis() - start)
+                    + "] ms");
+            }
             return ids.iterator();
         } catch (AmazonServiceException e) {
             throw new DataStoreException("Could not list objects", e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
         }
     }
 
     @Override
     public long getLastModified(DataIdentifier identifier)
             throws DataStoreException {
+        long start = System.currentTimeMillis();
         String key = getKeyName(identifier);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
             ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
-            return object.getLastModified().getTime();
+            long lastModified = object.getLastModified().getTime();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Identifier [" + identifier.toString()
+                    + "] 's lastModified = [" + lastModified + "] took ["
+                    + (System.currentTimeMillis() - start) + "] ms");
+            }
+            return lastModified;
         } catch (AmazonServiceException e) {
-            throw new DataStoreException(
-                "Could not getLastModified of dataIdentifier " + identifier, e);
+            if (e.getStatusCode() == 404) {
+                LOG.info("getLastModified:Identifier [" + identifier.toString()
+                    + "] not found. Took ["
+                    + (System.currentTimeMillis() - start) + "]ms");
+            }
+            throw new DataStoreException(e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
         }
     }
 
     @Override
     public long getLength(DataIdentifier identifier) throws DataStoreException {
+        long start = System.currentTimeMillis();
         String key = getKeyName(identifier);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
             ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
-            return object.getContentLength();
+            long length = object.getContentLength();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Identifier [" + identifier.toString()
+                    + "] 's length = [" + length + "] took ["
+                    + (System.currentTimeMillis() - start) + "] ms");
+            }
+            return length;
         } catch (AmazonServiceException e) {
             throw new DataStoreException("Could not length of dataIdentifier "
                 + identifier, e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
         }
     }
 
     @Override
     public void deleteRecord(DataIdentifier identifier)
             throws DataStoreException {
+        long start = System.currentTimeMillis();
         String key = getKeyName(identifier);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
             s3service.deleteObject(bucket, key);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Identifier [" + identifier.toString()
+                    + "] 's deleted. It took ["
+                    + (System.currentTimeMillis() - start) + "] ms");
+            }
         } catch (AmazonServiceException e) {
             throw new DataStoreException(
                 "Could not getLastModified of dataIdentifier " + identifier, e);
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public Set<DataIdentifier> deleteAllOlderThan(long min)
+            throws DataStoreException {
+        long start = System.currentTimeMillis();
+        // S3 stores lastModified to lower boundary of timestamp in ms.
+        // and hence min is reduced by 1000ms.
+        min = min - 1000;
+        Set<DataIdentifier> deleteIdSet = new HashSet<DataIdentifier>(30);
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            ObjectListing prevObjectListing = s3service.listObjects(bucket);
+            while (true) {
+                List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+                for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                    DataIdentifier identifier = new DataIdentifier(
+                        getIdentifierName(s3ObjSumm.getKey()));
+                    long lastModified = s3ObjSumm.getLastModified().getTime();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("id [" + identifier + "], lastModified ["
+                            + lastModified + "]");
+                    }
+                    if (!store.isInUse(identifier) && lastModified < min) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("add id :" + s3ObjSumm.getKey()
+                                + " to delete lists");
+                        }
+                        deleteList.add(new DeleteObjectsRequest.KeyVersion(
+                            s3ObjSumm.getKey()));
+                        deleteIdSet.add(identifier);
+                    }
+                }
+                if (deleteList.size() > 0) {
+                    DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
+                        bucket);
+                    delObjsReq.setKeys(deleteList);
+                    DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+                    if (dobjs.getDeletedObjects().size() != deleteList.size()) {
+                        throw new DataStoreException(
+                            "Incomplete delete object request. only  "
+                                + dobjs.getDeletedObjects().size() + " out of "
+                                + deleteList.size() + " are deleted");
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(deleteList
+                                + " records deleted from datastore");
+                        }
+                    }
+                }
+                if (!prevObjectListing.isTruncated()) {
+                    break;
+                }
+                prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+            }
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
         }
+        LOG.info("deleteAllOlderThan: min=[" + min + "] exit. Deleted ["
+            + deleteIdSet + "] records. Number of records deleted ["
+            + deleteIdSet.size() + "] took ["
+            + (System.currentTimeMillis() - start) + "] ms");
+        return deleteIdSet;
     }
 
     @Override
-    public List<DataIdentifier> deleteAllOlderThan(long min)
+    public void close() {
+        // backend is closing. abort all mulitpart uploads from start.
+        tmx.abortMultipartUploads(bucket, startTime);
+        tmx.shutdownNow();
+        s3service.shutdown();
+        LOG.info("S3Backend closed.");
+    }
+
+    public String getBucket() {
+        return bucket;
+    }
+
+    public void setBucket(String bucket) {
+        this.bucket = bucket;
+    }
+
+    private void write(DataIdentifier identifier, File file,
+            boolean asyncUpload, AsyncUploadCallback callback)
             throws DataStoreException {
-        LOG.info("deleteAllOlderThan " + new Date(min));
-        List<DataIdentifier> diDeleteList = new ArrayList<DataIdentifier>(30);
-        ObjectListing prevObjectListing = s3service.listObjects(bucket);
-        while (true) {
+        String key = getKeyName(identifier);
+        ObjectMetadata objectMetaData = null;
+        long start = System.currentTimeMillis();
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            // check if the same record already exists
+            try {
+                objectMetaData = s3service.getObjectMetadata(bucket, key);
+            } catch (AmazonServiceException ase) {
+                if (ase.getStatusCode() != 404) {
+                    throw ase;
+                }
+            }
+            if (objectMetaData != null) {
+                long l = objectMetaData.getContentLength();
+                if (l != file.length()) {
+                    throw new DataStoreException("Collision: " + key
+                        + " new length: " + file.length() + " old length: " + l);
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(key + "   exists, lastmodified ="
+                        + objectMetaData.getLastModified().getTime());
+                }
+                CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
+                    bucket, key);
+                copReq.setNewObjectMetadata(objectMetaData);
+                s3service.copyObject(copReq);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("lastModified of " + identifier.toString()
+                        + " updated successfully");
+                }
+                if (callback != null) {
+                    callback.call(identifier, file,
+                        AsyncUploadCallback.RESULT.SUCCESS);
+                }
+            }
+
+            if (objectMetaData == null) {
+                try {
+                    // start multipart parallel upload using amazon sdk
+                    Upload up = tmx.upload(new PutObjectRequest(bucket, key,
+                        file));
+                    // wait for upload to finish
+                    if (asyncUpload) {
+                        up.addProgressListener(new S3UploadProgressListener(
+                            identifier, file, callback));
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("added upload progress listener to identifier ["
+                                + identifier + "]");
+                        }
+                    } else {
+                        up.waitForUploadResult();
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("synchronous upload to identifier ["
+                                + identifier + "] completed.");
+                        }
+                        if (callback != null) {
+                            callback.call(identifier, file,
+                                AsyncUploadCallback.RESULT.SUCCESS);
+                        }
+                    }
+                } catch (Exception e2) {
+                    if (!asyncUpload) {
+                        callback.call(identifier, file,
+                            AsyncUploadCallback.RESULT.ABORTED);
+                    }
+                    throw new DataStoreException("Could not upload " + key, e2);
+                }
+            }
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("write [" + identifier + "] length [" + file.length()
+                + "], in async mode [" + asyncUpload + "] in ["
+                + (System.currentTimeMillis() - start) + "] ms.");
+        }
+    }
+
+    /**
+     * This method rename object keys in S3 concurrently. The number of
+     * concurrent threads is defined by 'maxConnections' property in
+     * aws.properties. As S3 doesn't have "move" command, this method simulate
+     * move as copy object object to new key and then delete older key.
+     */
+    private void renameKeys() throws DataStoreException {
+        long startTime = System.currentTimeMillis();
+        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+        long count = 0;
+        try {
+            Thread.currentThread().setContextClassLoader(
+                getClass().getClassLoader());
+            ObjectListing prevObjectListing = s3service.listObjects(bucket,
+                KEY_PREFIX);
             List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
-            for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
-                DataIdentifier identifier = new DataIdentifier(
-                    getIdentifierName(s3ObjSumm.getKey()));
-                if (!store.isInUse(identifier)
-                    && s3ObjSumm.getLastModified().getTime() < min) {
-                    LOG.info("add id :" + s3ObjSumm.getKey()
-                        + " to delete lists");
+            int nThreads = Integer.parseInt(prop.getProperty("maxConnections"));
+            ExecutorService executor = Executors.newFixedThreadPool(nThreads,
+                new NamedThreadFactory("s3-object-rename-worker"));
+            boolean taskAdded = false;
+            while (true) {
+                for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                    executor.execute(new KeyRenameThread(s3ObjSumm.getKey()));
+                    taskAdded = true;
+                    count++;
                     deleteList.add(new DeleteObjectsRequest.KeyVersion(
                         s3ObjSumm.getKey()));
-                    diDeleteList.add(new DataIdentifier(
-                        getIdentifierName(s3ObjSumm.getKey())));
                 }
+                if (!prevObjectListing.isTruncated()) break;
+                prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+            }
+            // This will make the executor accept no new threads
+            // and finish all existing threads in the queue
+            executor.shutdown();
+
+            try {
+                // Wait until all threads are finish
+                while (taskAdded
+                    && !executor.awaitTermination(10, TimeUnit.SECONDS)) {
+                    LOG.info("Rename S3 keys tasks timedout. Waiting again");
+                }
+            } catch (InterruptedException ie) {
+
             }
+            LOG.info("Renamed [" + count + "] keys, time taken ["
+                + ((System.currentTimeMillis() - startTime) / 1000) + "] sec");
+            // Delete older keys.
             if (deleteList.size() > 0) {
                 DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
                     bucket);
-                delObjsReq.setKeys(deleteList);
-                DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
-                if (dobjs.getDeletedObjects().size() != deleteList.size()) {
-                    throw new DataStoreException(
-                        "Incomplete delete object request. only  "
-                            + dobjs.getDeletedObjects().size() + " out of "
-                            + deleteList.size() + " are deleted");
-                }
-                LOG.info(deleteList.size() + " records deleted from datastore");
+                int batchSize = 500, startIndex = 0, size = deleteList.size();
+                int endIndex = batchSize < size ? batchSize : size;
+                while (endIndex <= size) {
+                    delObjsReq.setKeys(Collections.unmodifiableList(deleteList.subList(
+                        startIndex, endIndex)));
+                    DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+                    LOG.info("Records[" + dobjs.getDeletedObjects().size()
+                        + "] deleted in datastore from index [" + startIndex
+                        + "] to [" + (endIndex - 1) + "]");
+                    if (endIndex == size) {
+                        break;
+                    } else {
+                        startIndex = endIndex;
+                        endIndex = (startIndex + batchSize) < size
+                                ? (startIndex + batchSize)
+                                : size;
+                    }
+                }
             }
-            if (!prevObjectListing.isTruncated()) {
-                break;
+        } finally {
+            if (contextClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(contextClassLoader);
             }
-            prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
         }
-        LOG.info("deleteAllOlderThan  exit");
-        return diDeleteList;
     }
 
-    @Override
-    public void close() {
-        s3service.shutdown();
-        s3service = null;
-        tmx = null;
+    /**
+     * The method convert old key format to new format. For e.g. this method
+     * converts old key dataStore_004cb70c8f87d78f04da41e7547cb434094089ea to
+     * 004c-b70c8f87d78f04da41e7547cb434094089ea.
+     */
+    private static String convertKey(String oldKey)
+            throws IllegalArgumentException {
+        if (!oldKey.startsWith(KEY_PREFIX)) {
+            throw new IllegalArgumentException("[" + oldKey
+                + "] doesn't start with prefix [" + KEY_PREFIX + "]");
+        }
+        String key = oldKey.substring(KEY_PREFIX.length());
+        return key.substring(0, 4) + DASH + key.substring(4);
     }
 
     /**
      * Get key from data identifier. Object is stored with key in S3.
      */
     private static String getKeyName(DataIdentifier identifier) {
-        return KEY_PREFIX + identifier.toString();
+        String key = identifier.toString();
+        return key.substring(0, 4) + DASH + key.substring(4);
     }
 
     /**
      * Get data identifier from key.
      */
     private static String getIdentifierName(String key) {
-        if (!key.startsWith(KEY_PREFIX)) {
+        if (!key.contains(DASH)) {
             return null;
         }
-        return key.substring(KEY_PREFIX.length());
+        return key.substring(0, 4) + key.substring(5);
+    }
+
+    /**
+     * The class renames object key in S3 in a thread.
+     */
+    private class KeyRenameThread implements Runnable {
+
+        private String oldKey;
+
+        public void run() {
+            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+            try {
+                Thread.currentThread().setContextClassLoader(
+                    getClass().getClassLoader());
+                String newS3Key = convertKey(oldKey);
+                CopyObjectRequest copReq = new CopyObjectRequest(bucket,
+                    oldKey, bucket, newS3Key);
+                s3service.copyObject(copReq);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(oldKey + " renamed to " + newS3Key);
+                }
+            } finally {
+                if (contextClassLoader != null) {
+                    Thread.currentThread().setContextClassLoader(
+                        contextClassLoader);
+                }
+            }
+        }
+
+        public KeyRenameThread(String oldKey) {
+            this.oldKey = oldKey;
+        }
     }
 
     /**
-     * Returns a new thread pool configured with the default settings.
-     * 
-     * @return A new thread pool configured with the default settings.
+     * Listener which receives callback on status of S3 upload.
      */
-    private ThreadPoolExecutor createDefaultExecutorService() {
-        ThreadFactory threadFactory = new ThreadFactory() {
-            private int threadCount = 1;
+    private class S3UploadProgressListener implements ProgressListener {
+
+        private File file;
+
+        private DataIdentifier identifier;
+
+        private AsyncUploadCallback callback;
 
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r);
-                thread.setContextClassLoader(getClass().getClassLoader());
-                thread.setName("s3-transfer-manager-worker-" + threadCount++);
-                return thread;
+        public S3UploadProgressListener(DataIdentifier identifier, File file,
+                AsyncUploadCallback callback) {
+            super();
+            this.identifier = identifier;
+            this.file = file;
+            this.callback = callback;
+        }
+
+        public void progressChanged(ProgressEvent progressEvent) {
+            switch (progressEvent.getEventCode()) {
+                case ProgressEvent.COMPLETED_EVENT_CODE:
+                    callback.call(identifier, file,
+                        AsyncUploadCallback.RESULT.SUCCESS);
+                    break;
+                case ProgressEvent.FAILED_EVENT_CODE:
+                    callback.call(identifier, file,
+                        AsyncUploadCallback.RESULT.FAILED);
+                    break;
+                default:
+                    break;
             }
-        };
-        return (ThreadPoolExecutor) Executors.newFixedThreadPool(10,
-            threadFactory);
+        }
+    }
+
+    /**
+     * This class implements {@link Runnable} interface to upload {@link File}
+     * to S3 asynchronously.
+     */
+    private class AsyncUploadJob implements Runnable {
+
+        private DataIdentifier identifier;
+
+        private File file;
+
+        private AsyncUploadCallback callback;
+
+        public AsyncUploadJob(DataIdentifier identifier, File file,
+                AsyncUploadCallback callback) {
+            super();
+            this.identifier = identifier;
+            this.file = file;
+            this.callback = callback;
+        }
+
+        public void run() {
+            try {
+                write(identifier, file, true, callback);
+            } catch (DataStoreException e) {
+                LOG.error("Could not upload [" + identifier + "], file[" + file
+                    + "]", e);
+            }
+
+        }
     }
 }

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java Thu Mar 13 12:12:33 2014
@@ -21,11 +21,9 @@ import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
 
-import org.apache.jackrabbit.aws.ext.ds.TestCaseBase;
-import org.apache.jackrabbit.aws.ext.ds.TestInMemDs;
-import org.apache.jackrabbit.aws.ext.ds.TestInMemDsCacheOff;
 import org.apache.jackrabbit.aws.ext.ds.TestS3Ds;
 import org.apache.jackrabbit.aws.ext.ds.TestS3DsCacheOff;
+import org.apache.jackrabbit.core.data.TestCaseBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,9 +42,6 @@ public class TestAll extends TestCase {
      */
     public static Test suite() {
         TestSuite suite = new TestSuite("S3 tests");
-        suite.addTestSuite(TestLocalCache.class);
-        suite.addTestSuite(TestInMemDs.class);
-        suite.addTestSuite(TestInMemDsCacheOff.class);
         String config = System.getProperty(TestCaseBase.CONFIG);
         LOG.info("config= " + config);
         if (config != null && !"".equals(config.trim())) {

Added: jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/S3TestDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/S3TestDataStore.java?rev=1577127&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/S3TestDataStore.java (added)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/S3TestDataStore.java Thu Mar 13 12:12:33 2014
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+import org.apache.jackrabbit.core.data.Backend;
+
+/**
+ * This class intialize {@link S3DataStore} with the give bucket. The other
+ * configuration are taken from configuration file. This class is implemented so
+ * that each test case run in its own bucket. It was required as deletions in
+ * bucket are not immediately reflected in the next test case.
+ */
+public class S3TestDataStore extends S3DataStore {
+    String bucket;
+
+    public S3TestDataStore() {
+        super();
+    }
+
+    public S3TestDataStore(String bucket) {
+        super();
+        this.bucket = bucket;
+    }
+
+    protected Backend createBackend() {
+        Backend backend = new S3Backend();
+        ((S3Backend) backend).setBucket(bucket);
+        return backend;
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/S3TestDataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java Thu Mar 13 12:12:33 2014
@@ -16,11 +16,25 @@
  */
 package org.apache.jackrabbit.aws.ext.ds;
 
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 import java.util.Properties;
 
+import javax.jcr.RepositoryException;
+
 import org.apache.jackrabbit.aws.ext.Utils;
+import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.TestCaseBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.TransferManager;
 
 /**
  * Test {@link CachingDataStore} with S3Backend and local cache on. It requires
@@ -30,18 +44,71 @@ import org.apache.jackrabbit.core.data.C
  */
 public class TestS3Ds extends TestCaseBase {
 
-    public TestS3Ds() {
-        config = System.getProperty(CONFIG);
-        memoryBackend = false;
-        noCache = false;
-    }
+    protected static final Logger LOG = LoggerFactory.getLogger(TestS3Ds.class);
 
+    private Date startTime = null;
 
-    @Override
-    protected void tearDown() throws IOException {
+    public TestS3Ds() {
+      config = System.getProperty(CONFIG);
+      memoryBackend = false;
+      noCache = false;
+  }
+
+    protected void setUp() throws Exception {
+        startTime = new Date();
+        super.setUp();
+    }
+    protected void tearDown() throws Exception {
+        deleteBucket();
         super.tearDown();
+    }
+    
+    protected CachingDataStore createDataStore() throws RepositoryException {
+        ds = new S3TestDataStore(String.valueOf(randomGen.nextInt(9999)) + "-test");
+        ds.setConfig(config);
+        if (noCache) {
+            ds.setCacheSize(0);
+        }
+        ds.init(dataStoreDir);
+        return ds;
+    }
+
+    /**
+     * Cleaning of bucket after test run.
+     */
+    /**
+     * Cleaning of bucket after test run.
+     */
+    public void deleteBucket() throws Exception {
         Properties props = Utils.readConfig(config);
-        Utils.deleteBucket(props);
+        AmazonS3Client s3service = Utils.openService(props);
+        Backend backend = ds.getBackend();
+        String bucket = ((S3Backend)backend).getBucket();
+        LOG.info("delete bucket [" + bucket + "]");
+        TransferManager tmx = new TransferManager(s3service);
+        if (s3service.doesBucketExist(bucket)) {
+            for (int i = 0; i < 3; i++) {
+                tmx.abortMultipartUploads(bucket, startTime);
+                ObjectListing prevObjectListing = s3service.listObjects(bucket);
+                while (prevObjectListing != null ) {
+                    List<DeleteObjectsRequest.KeyVersion> deleteList = new ArrayList<DeleteObjectsRequest.KeyVersion>();
+                    for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+                        deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+                    }
+                    if (deleteList.size() > 0) {
+                        DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(bucket);
+                        delObjsReq.setKeys(deleteList);
+                        s3service.deleteObjects(delObjsReq);
+                    }
+                    if (!prevObjectListing.isTruncated()) break;
+                    prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+                }
+            }
+            s3service.deleteBucket(bucket);
+            LOG.info("bucket: " + bucket + " deleted");
+            tmx.shutdownNow();
+            s3service.shutdown();
+        }
     }
 
 }

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java Thu Mar 13 12:12:33 2014
@@ -17,6 +17,8 @@
 package org.apache.jackrabbit.aws.ext.ds;
 
 import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test {@link CachingDataStore} with S3Backend and local cache Off. It requires
@@ -26,6 +28,8 @@ import org.apache.jackrabbit.core.data.C
  */
 public class TestS3DsCacheOff extends TestS3Ds {
 
+    protected static final Logger LOG = LoggerFactory.getLogger(TestS3DsCacheOff.class);
+
     public TestS3DsCacheOff() {
         config = System.getProperty(CONFIG);
         memoryBackend = false;

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/aws.properties
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/aws.properties?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/aws.properties (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/aws.properties Thu Mar 13 12:12:33 2014
@@ -32,7 +32,14 @@ s3Bucket=
 # Asia Pacific (Tokyo) ap-northeast-1
 # South America (Sao Paulo) sa-east-1
 s3Region=
+# S3 endpoint to be used. This parameter is optional
+# and has a higher precedence over endpoint derived
+# via S3 region.
+s3EndPoint=
 connectionTimeout=120000
 socketTimeout=120000
-maxConnections=10
+maxConnections=20
 maxErrorRetry=10
+# maximum concurrent threads to write to S3.
+writeThreads=10
+

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml Thu Mar 13 12:12:33 2014
@@ -40,7 +40,10 @@
         <param name="cacheSize" value="68719476736"/>
         <param name="cachePurgeTrigFactor" value="0.95d"/>
         <param name="cachePurgeResizeFactor" value="0.85d"/>
-	</DataStore>
+        <param name="continueOnAsyncUploadFailure" value="false"/>
+        <param name="concurrentUploadsThreads" value="10"/>
+        <param name="asyncUploadLimit" value="100"/>
+      </DataStore>
     <!--
         sample database data store configuration
         <DataStore class="org.apache.jackrabbit.core.data.db.DbDataStore">

Modified: jackrabbit/trunk/jackrabbit-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/pom.xml?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/pom.xml (original)
+++ jackrabbit/trunk/jackrabbit-core/pom.xml Thu Mar 13 12:12:33 2014
@@ -249,7 +249,13 @@ org.apache.jackrabbit.core.data.GarbageC
       <groupId>org.apache.jackrabbit</groupId>
       <artifactId>jackrabbit-data</artifactId>
       <version>${project.version}</version>
-    </dependency>    
+    </dependency>
+    <dependency>
+        <groupId>org.apache.jackrabbit</groupId>
+        <artifactId>jackrabbit-data</artifactId>
+        <version>${project.version}</version>
+        <type>test-jar</type>
+    </dependency>  
     <dependency>
       <groupId>org.apache.jackrabbit</groupId>
       <artifactId>jackrabbit-spi-commons</artifactId>

Modified: jackrabbit/trunk/jackrabbit-data/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/pom.xml?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/pom.xml (original)
+++ jackrabbit/trunk/jackrabbit-data/pom.xml Thu Mar 13 12:12:33 2014
@@ -44,6 +44,18 @@
 					</excludes>
 				</configuration>
 			</plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                     <execution>
+                         <goals>
+                             <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
 		</plugins>
 	</build>
 
@@ -86,6 +98,17 @@
 			<groupId>org.slf4j</groupId>
 			<artifactId>jcl-over-slf4j</artifactId>
 		</dependency>
-
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.5</version>
+            <scope>test</scope>
+        </dependency>
 	</dependencies>
 </project>
\ No newline at end of file

Added: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCache.java?rev=1577127&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCache.java (added)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCache.java Thu Mar 13 12:12:33 2014
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.data;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds all in progress uploads. This class contains two data
+ * structures, one is {@link #asyncUploadMap} which is {@link Map<String, Long>}
+ * of file path vs lastModified of upload. The second {@link #toBeDeleted} is
+ * {@link Set<String>} of upload which is marked for delete, while it is already
+ * in progress. Before starting an asynchronous upload, it requires to invoke
+ * {@link #add(String)} to add entry to {@link #asyncUploadMap}. After
+ * asynchronous upload completes, it requires to invoke
+ * {@link #remove(String, AsyncUploadCacheResult)} to remove entry from
+ * {@link #asyncUploadMap} Any modification to this class are immediately
+ * persisted to local file system. {@link #asyncUploadMap} is persisted to /
+ * {@link homeDir}/ {@link #PENDIND_UPLOAD_FILE}. {@link #toBeDeleted} is
+ * persisted to / {@link homeDir}/ {@link #TO_BE_DELETED_UPLOAD_FILE}. The /
+ * {@link homeDir} refer to ${rep.home}.
+ */
+public class AsyncUploadCache {
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncUploadCache.class);
+
+    /**
+     * {@link Map} of fileName Vs lastModified to store asynchronous upload.
+     */
+    Map<String, Long> asyncUploadMap = new HashMap<String, Long>();
+
+    /**
+     * {@link Set} of fileName which are mark for delete during asynchronous
+     * Upload.
+     */
+    Set<String> toBeDeleted = new HashSet<String>();
+
+    String path;
+
+    String homeDir;
+
+    int asyncUploadLimit;
+
+    private File pendingUploads;
+
+    private File toBeDeletedUploads;
+
+    private static final String PENDIND_UPLOAD_FILE = "async-pending-uploads.ser";
+
+    private static final String TO_BE_DELETED_UPLOAD_FILE = "async-tobedeleted-uploads.ser";
+
+    /**
+     * This methods checks if file can be added to {@link #asyncUploadMap}. If
+     * yes it adds to {@link #asyncUploadMap} and
+     * {@link #serializeAsyncUploadMap()} the {@link #asyncUploadMap} to disk.
+     * 
+     * @return {@link AsyncUploadCacheResult} if successfully added to
+     *         asynchronous uploads it sets
+     *         {@link AsyncUploadCacheResult#setAsyncUpload(boolean)} to true
+     *         else sets to false.
+     */
+    public synchronized AsyncUploadCacheResult add(String fileName)
+            throws IOException {
+        AsyncUploadCacheResult result = new AsyncUploadCacheResult();
+        if (asyncUploadMap.entrySet().size() >= asyncUploadLimit) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Async write limit [" + asyncUploadLimit
+                    + "] reached. File [" + fileName
+                    + "]  not added to async write cache.");
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("current set =" + asyncUploadMap.keySet());
+            }
+            result.setAsyncUpload(false);
+        } else {
+            long startTime = System.currentTimeMillis();
+            if (toBeDeleted.remove(fileName)) {
+                serializeToBeDeleted();
+            }
+            asyncUploadMap.put(fileName, System.currentTimeMillis());
+            serializeAsyncUploadMap();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("added file [" + fileName
+                    + "] to asyncUploadMap upoad took ["
+                    + ((System.currentTimeMillis() - startTime) / 1000)
+                    + "] sec");
+                LOG.debug("current set =" + asyncUploadMap.keySet());
+            }
+            result.setAsyncUpload(true);
+        }
+        return result;
+    }
+
+    /**
+     * This methods removes file (if found) from {@link #asyncUploadMap}. If
+     * file is found, it immediately serializes the {@link #asyncUploadMap} to
+     * disk. This method sets
+     * {@link AsyncUploadCacheResult#setRequiresDelete(boolean)} to true, if
+     * asynchronous upload found to be in {@link #toBeDeleted} set i.e. marked
+     * for delete.
+     */
+    public synchronized AsyncUploadCacheResult remove(String fileName)
+            throws IOException {
+        long startTime = System.currentTimeMillis();
+        Long retVal = asyncUploadMap.remove(fileName);
+        if (retVal != null) {
+            serializeAsyncUploadMap();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("removed file [" + fileName
+                    + "] from asyncUploadMap took ["
+                    + ((System.currentTimeMillis() - startTime) / 1000)
+                    + "] sec");
+                LOG.debug("current set =" + asyncUploadMap.keySet());
+            }
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("cannot remove file [" + fileName
+                    + "] from pending upoad took ["
+                    + ((System.currentTimeMillis() - startTime) / 1000)
+                    + "] sec. File not found");
+                LOG.debug("current set =" + asyncUploadMap.keySet());
+            }
+        }
+        AsyncUploadCacheResult result = new AsyncUploadCacheResult();
+        result.setRequiresDelete(toBeDeleted.contains(fileName));
+        return result;
+    }
+
+    /**
+     * This methods returns the in progress asynchronous uploads which are not
+     * marked for delete.
+     */
+    public synchronized Set<String> getAll() {
+        Set<String> retVal = new HashSet<String>();
+        retVal.addAll(asyncUploadMap.keySet());
+        retVal.removeAll(toBeDeleted);
+        return retVal;
+    }
+
+    /**
+     * This methos checks if asynchronous upload is in progress for @param
+     * fileName. If @param touch is true, the lastModified is updated to current
+     * time.
+     */
+    public synchronized boolean hasEntry(String fileName, boolean touch)
+            throws IOException {
+        boolean contains = asyncUploadMap.containsKey(fileName)
+            && !toBeDeleted.contains(fileName);
+        if (touch && contains) {
+            long timeStamp = System.currentTimeMillis();
+            asyncUploadMap.put(fileName, timeStamp);
+            serializeAsyncUploadMap();
+        }
+        return contains;
+    }
+
+    /**
+     * Returns lastModified from {@link #asyncUploadMap} if found else returns
+     * 0.
+     */
+    public synchronized long getLastModified(String fileName) {
+        return asyncUploadMap.get(fileName) != null
+            && !toBeDeleted.contains(fileName)
+                ? asyncUploadMap.get(fileName)
+                : 0;
+    }
+
+    /**
+     * This methods deletes asynchronous upload for @param fileName if there
+     * exists asynchronous upload for @param fileName.
+     */
+    public synchronized void delete(String fileName) throws IOException {
+        boolean serialize = false;
+        if (toBeDeleted.remove(fileName)) {
+            serialize = true;
+        }
+        if (asyncUploadMap.containsKey(fileName) && toBeDeleted.add(fileName)) {
+            serialize = true;
+        }
+        if (serialize) {
+            serializeToBeDeleted();
+        }
+    }
+
+    /**
+     * Delete in progress asynchronous uploads which are older than @param min.
+     * This method leverage lastModified stored in {@link #asyncUploadMap}
+     */
+    public synchronized Set<String> deleteOlderThan(long min)
+            throws IOException {
+        min = min - 1000;
+        LOG.info("deleteOlderThan min =" + min);
+        Set<String> deleteSet = new HashSet<String>();
+        for (Map.Entry<String, Long> entry : asyncUploadMap.entrySet()) {
+            if (entry.getValue() < min) {
+                deleteSet.add(entry.getKey());
+            }
+        }
+        if (deleteSet.size() > 0) {
+            LOG.info("deleteOlderThan set =" + deleteSet);
+            toBeDeleted.addAll(deleteSet);
+            serializeToBeDeleted();
+        }
+        return deleteSet;
+    }
+
+    /**
+     * @param homeDir
+     *            home directory of repository.
+     * @param path
+     *            path of the {@link LocalCache}
+     * @param asyncUploadLimit
+     *            the maximum number of asynchronous uploads
+     */
+    public synchronized void init(String homeDir, String path,
+            int asyncUploadLimit) throws IOException, ClassNotFoundException {
+        this.homeDir = homeDir;
+        this.path = path;
+        this.asyncUploadLimit = asyncUploadLimit;
+        LOG.info("AsynWriteCache:homeDir [" + homeDir + "], path [" + path
+            + "], asyncUploadLimit [" + asyncUploadLimit + "].");
+        pendingUploads = new File(homeDir + "/" + PENDIND_UPLOAD_FILE);
+        if (pendingUploads.exists()) {
+            deserializeAsyncUploadMap();
+        } else {
+            pendingUploads.createNewFile();
+            asyncUploadMap = new HashMap<String, Long>();
+            serializeAsyncUploadMap();
+        }
+        toBeDeletedUploads = new File(homeDir + "/" + TO_BE_DELETED_UPLOAD_FILE);
+        if (toBeDeletedUploads.exists()) {
+            deserializeToBeDeleted();
+        } else {
+            toBeDeletedUploads.createNewFile();
+            asyncUploadMap = new HashMap<String, Long>();
+            serializeToBeDeleted();
+        }
+    }
+
+    /**
+     * Reset the {@link AsyncUploadCache} to empty {@link #asyncUploadMap} and
+     * {@link #toBeDeleted}
+     */
+    public synchronized void reset() throws IOException {
+        String filePath = pendingUploads.getAbsolutePath();
+        if (pendingUploads.exists()) {
+            if (!pendingUploads.delete()) {
+                throw new IOException("Failed to delete file [" + filePath
+                    + "]");
+            }
+        }
+        pendingUploads.createNewFile();
+        asyncUploadMap = new HashMap<String, Long>();
+        serializeAsyncUploadMap();
+
+        filePath = toBeDeletedUploads.getAbsolutePath();
+        if (toBeDeletedUploads.exists()) {
+            if (!toBeDeletedUploads.delete()) {
+                throw new IOException("Failed to delete file [" + filePath
+                    + "]");
+            }
+        }
+        toBeDeletedUploads.createNewFile();
+        toBeDeleted = new HashSet<String>();
+        serializeToBeDeleted();
+    }
+
+    /**
+     * Serialize {@link #asyncUploadMap} to local file system.
+     */
+    private synchronized void serializeAsyncUploadMap() throws IOException {
+
+        // use buffering
+        OutputStream fos = new FileOutputStream(pendingUploads);
+        OutputStream buffer = new BufferedOutputStream(fos);
+        ObjectOutput output = new ObjectOutputStream(buffer);
+        try {
+            output.writeObject(asyncUploadMap);
+        } finally {
+            output.close();
+        }
+    }
+
+    /**
+     * Deserialize {@link #asyncUploadMap} from local file system.
+     */
+    private synchronized void deserializeAsyncUploadMap() throws IOException,
+            ClassNotFoundException {
+        // use buffering
+        InputStream fis = new FileInputStream(pendingUploads);
+        InputStream buffer = new BufferedInputStream(fis);
+        ObjectInput input = new ObjectInputStream(buffer);
+        try {
+            asyncUploadMap = (Map<String, Long>) input.readObject();
+            // display its data
+        } finally {
+            input.close();
+        }
+    }
+
+    /**
+     * Serialize {@link #toBeDeleted} to local file system.
+     */
+    private synchronized void serializeToBeDeleted() throws IOException {
+
+        // use buffering
+        OutputStream fos = new FileOutputStream(toBeDeletedUploads);
+        OutputStream buffer = new BufferedOutputStream(fos);
+        ObjectOutput output = new ObjectOutputStream(buffer);
+        try {
+            output.writeObject(toBeDeleted);
+        } finally {
+            output.close();
+        }
+    }
+
+    /**
+     * Deserialize {@link #toBeDeleted} from local file system.
+     */
+    private synchronized void deserializeToBeDeleted() throws IOException,
+            ClassNotFoundException {
+        // use buffering
+        InputStream fis = new FileInputStream(toBeDeletedUploads);
+        InputStream buffer = new BufferedInputStream(fis);
+        ObjectInput input = new ObjectInputStream(buffer);
+        try {
+            toBeDeleted = (Set<String>) input.readObject();
+        } finally {
+            input.close();
+        }
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCacheResult.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCacheResult.java?rev=1577127&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCacheResult.java (added)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCacheResult.java Thu Mar 13 12:12:33 2014
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.data;
+
+import java.io.File;
+
+/**
+ * This class holds result of asynchronous upload from {@link AsyncUploadCache}
+ */
+public class AsyncUploadCacheResult {
+
+    /**
+     * flag to indicate that asynchronous upload can be started on file.
+     */
+    private boolean asyncUpload;
+
+    /**
+     * flag to indicate that cached file requires to be deleted. It is
+     * applicable in case where file marked for delete before asynchronous
+     * upload completes.
+     */
+    private boolean requiresDelete;
+
+    private File file;
+
+    /**
+     * Flag to denote that asynchronous upload can be started on file.
+     */
+    public boolean canAsyncUpload() {
+        return asyncUpload;
+    }
+
+    public void setAsyncUpload(boolean asyncUpload) {
+        this.asyncUpload = asyncUpload;
+    }
+
+    /**
+     * Flag to indicate that record to be deleted from {@link Datastore}.
+     */
+    public boolean doRequiresDelete() {
+        return requiresDelete;
+    }
+
+    public void setRequiresDelete(boolean requiresDelete) {
+        this.requiresDelete = requiresDelete;
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+    public void setFile(File file) {
+        this.file = file;
+    }
+
+}

Propchange: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCacheResult.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java?rev=1577127&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java (added)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java Thu Mar 13 12:12:33 2014
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.core.data;
+
+import java.io.File;
+
+/**
+ * This interface defines method which would be called along with status.
+ */
+public interface AsyncUploadCallback {
+
+    public void call(DataIdentifier identifier, File file, RESULT result);
+
+    public enum RESULT {
+        /**
+         * Asynchronous upload has succeeded.
+         */
+        SUCCESS,
+        /**
+         * Asynchronous upload has failed.
+         */
+        FAILED,
+        /**
+         * Asynchronous upload has been aborted.
+         */
+        ABORTED
+    };
+}

Propchange: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message