jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chet...@apache.org
Subject svn commit: r1585461 - in /jackrabbit/trunk: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/ jackrabbit-aws-ext/src/test/resources/ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/ jackrabbit-data/src/test/java/org/apac...
Date Mon, 07 Apr 2014 11:54:37 GMT
Author: chetanm
Date: Mon Apr  7 11:54:37 2014
New Revision: 1585461

URL: http://svn.apache.org/r1585461
Log:
JCR-3754 - [jackrabbit-aws-ext] Add retry logic to S3 asynchronous failed upload

Applying patch from Shashank Gupta

Added:
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadResult.java   (with props)
Modified:
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/log4j.properties
    jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java?rev=1585461&r1=1585460&r2=1585461&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java Mon Apr  7 11:54:37 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.jackrabbit.aws.ext.S3Constants;
 import org.apache.jackrabbit.aws.ext.Utils;
 import org.apache.jackrabbit.core.data.AsyncUploadCallback;
+import org.apache.jackrabbit.core.data.AsyncUploadResult;
 import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.CachingDataStore;
 import org.apache.jackrabbit.core.data.DataIdentifier;
@@ -44,6 +45,7 @@ import org.apache.jackrabbit.core.data.u
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressListener;
@@ -99,6 +101,8 @@ public class S3Backend implements Backen
     private Properties properties;
 
     private Date startTime;
+    
+    private ThreadPoolExecutor asyncWriteExecuter;
 
     /**
      * Initialize S3Backend. It creates AmazonS3Client and TransferManager from
@@ -135,9 +139,7 @@ public class S3Backend implements Backen
             startTime = new Date();
             Thread.currentThread().setContextClassLoader(
                 getClass().getClassLoader());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("init");
-            }
+            LOG.debug("init");
             this.store = store;
             s3service = Utils.openService(prop);
             if (bucket == null || "".equals(bucket.trim())) {
@@ -157,9 +159,9 @@ public class S3Backend implements Backen
                     s3service.createBucket(bucket, region);
                     endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
                 }
-                LOG.info("Created bucket: " + bucket + " in " + region);
+                LOG.info("Created bucket [{}] in [{}] ", bucket, region);
             } else {
-                LOG.info("Using bucket: " + bucket);
+                LOG.info("Using bucket [{}]", bucket);
                 if (DEFAULT_AWS_BUCKET_REGION.equals(region)) {
                     endpoint = S3 + DOT + AWSDOTCOM;
                 } else if (Region.EU_Ireland.toString().equals(region)) {
@@ -178,18 +180,27 @@ public class S3Backend implements Backen
              * redirects it to correct location.
              */
             s3service.setEndpoint(endpoint);
-            LOG.info("S3 service endpoint: " + endpoint);
+            LOG.info("S3 service endpoint [{}] ", endpoint);
 
             int writeThreads = 10;
             String writeThreadsStr = prop.getProperty(S3Constants.S3_WRITE_THREADS);
             if (writeThreadsStr != null) {
                 writeThreads = Integer.parseInt(writeThreadsStr);
             }
-            LOG.info("Using thread pool of [" + writeThreads
-                + "] threads in S3 transfer manager");
+            LOG.info("Using thread pool of [{}] threads in S3 transfer manager.", writeThreads);
             tmx = new TransferManager(s3service,
                 (ThreadPoolExecutor) Executors.newFixedThreadPool(writeThreads,
                     new NamedThreadFactory("s3-transfer-manager-worker")));
+            
+            int asyncWritePoolSize = 10;
+            String maxConnsStr = prop.getProperty(S3Constants.S3_MAX_CONNS);
+            if (maxConnsStr != null) {
+                asyncWritePoolSize = Integer.parseInt(maxConnsStr)
+                    - writeThreads;
+            }
+            
+            asyncWriteExecuter = (ThreadPoolExecutor) Executors.newFixedThreadPool(
+                asyncWritePoolSize, new NamedThreadFactory("s3-write-worker"));
             String renameKeyProp = prop.getProperty(S3Constants.S3_RENAME_KEYS);
             boolean renameKeyBool = (renameKeyProp == null || "".equals(renameKeyProp))
                     ? true
@@ -197,15 +208,10 @@ public class S3Backend implements Backen
             if (renameKeyBool) {
                 renameKeys();
             }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("S3 Backend initialized in ["
-                    + (System.currentTimeMillis() - startTime.getTime())
-                    + "] ms");
-            }
+            LOG.debug("S3 Backend initialized in [{}] ms",
+                +(System.currentTimeMillis() - startTime.getTime()));
         } catch (Exception e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("  error ", e);
-            }
+            LOG.debug("  error ", e);
             throw new DataStoreException("Could not initialize S3 from "
                 + prop, e);
         } finally {
@@ -233,8 +239,8 @@ public class S3Backend implements Backen
             throw new IllegalArgumentException(
                 "callback parameter cannot be null in asyncUpload");
         }
-        Thread th = new Thread(new AsyncUploadJob(identifier, file, callback));
-        th.start();
+        asyncWriteExecuter.execute(new AsyncUploadJob(identifier, file,
+            callback));
     }
 
     /**
@@ -251,17 +257,15 @@ public class S3Backend implements Backen
             ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket,
                 key);
             if (objectMetaData != null) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("exists [" + identifier + "]: [true] took ["
-                        + (System.currentTimeMillis() - start) + "] ms");
-                }
+                LOG.debug("exists [{}]: [true] took [{}] ms.",
+                    identifier, (System.currentTimeMillis() - start) );
                 return true;
             }
             return false;
         } catch (AmazonServiceException e) {
             if (e.getStatusCode() == 404) {
-                LOG.info("exists [" + identifier + "]: [false] took ["
-                    + (System.currentTimeMillis() - start) + "] ms");
+                LOG.debug("exists [{}]: [false] took [{}] ms.",
+                    identifier, (System.currentTimeMillis() - start) );
                 return false;
             }
             throw new DataStoreException(
@@ -293,11 +297,8 @@ public class S3Backend implements Backen
                         key, bucket, key);
                     copReq.setNewObjectMetadata(objectMetaData);
                     s3service.copyObject(copReq);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("[ " + identifier.toString()
-                            + "] touched took ["
-                            + (System.currentTimeMillis() - start) + "] ms");
-                    }
+                    LOG.debug("[{}] touched took [{}] ms. ", identifier,
+                        (System.currentTimeMillis() - start));
                 }
             } else {
                 retVal = false;
@@ -320,11 +321,8 @@ public class S3Backend implements Backen
                 Thread.currentThread().setContextClassLoader(contextClassLoader);
             }
         }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("exists [" + identifier + "]: [" + retVal + "] took ["
-                + (System.currentTimeMillis() - start) + "] ms");
-        }
+        LOG.debug("exists [{}]: [{}] took [{}] ms.", new Object[] { identifier,
+            retVal, (System.currentTimeMillis() - start) });
         return retVal;
     }
 
@@ -339,10 +337,8 @@ public class S3Backend implements Backen
                 getClass().getClassLoader());
             S3Object object = s3service.getObject(bucket, key);
             InputStream in = object.getObjectContent();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("[ " + identifier.toString() + "] read took ["
-                    + (System.currentTimeMillis() - start) + "] ms");
-            }
+            LOG.debug("[{}] read took [{}]ms", identifier,
+                (System.currentTimeMillis() - start));
             return in;
         } catch (AmazonServiceException e) {
             throw new DataStoreException("Object not found: " + key, e);
@@ -373,11 +369,8 @@ public class S3Backend implements Backen
                 if (!prevObjectListing.isTruncated()) break;
                 prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
             }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("getAllIdentifiers returned size [ " + ids.size()
-                    + "] took [" + (System.currentTimeMillis() - start)
-                    + "] ms");
-            }
+            LOG.debug("getAllIdentifiers returned size [{}] took [{}] ms.",
+                ids.size(), (System.currentTimeMillis() - start));
             return ids.iterator();
         } catch (AmazonServiceException e) {
             throw new DataStoreException("Could not list objects", e);
@@ -399,17 +392,16 @@ public class S3Backend implements Backen
                 getClass().getClassLoader());
             ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
             long lastModified = object.getLastModified().getTime();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Identifier [" + identifier.toString()
-                    + "] 's lastModified = [" + lastModified + "] took ["
-                    + (System.currentTimeMillis() - start) + "] ms");
-            }
+            LOG.debug(
+                "Identifier [{}]'s lastModified = [{}] took [{}]ms.",
+                new Object[] { identifier, lastModified,
+                    (System.currentTimeMillis() - start) });
             return lastModified;
         } catch (AmazonServiceException e) {
             if (e.getStatusCode() == 404) {
-                LOG.info("getLastModified:Identifier [" + identifier.toString()
-                    + "] not found. Took ["
-                    + (System.currentTimeMillis() - start) + "]ms");
+                LOG.info(
+                    "getLastModified:Identifier [{}] not found. Took [{}] ms.",
+                    identifier, (System.currentTimeMillis() - start));
             }
             throw new DataStoreException(e);
         } finally {
@@ -429,11 +421,9 @@ public class S3Backend implements Backen
                 getClass().getClassLoader());
             ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
             long length = object.getContentLength();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Identifier [" + identifier.toString()
-                    + "] 's length = [" + length + "] took ["
-                    + (System.currentTimeMillis() - start) + "] ms");
-            }
+            LOG.debug("Identifier [{}]'s length = [{}] took [{}]ms.",
+                new Object[] { identifier, length,
+                    (System.currentTimeMillis() - start) });
             return length;
         } catch (AmazonServiceException e) {
             throw new DataStoreException("Could not length of dataIdentifier "
@@ -455,11 +445,8 @@ public class S3Backend implements Backen
             Thread.currentThread().setContextClassLoader(
                 getClass().getClassLoader());
             s3service.deleteObject(bucket, key);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Identifier [" + identifier.toString()
-                    + "] 's deleted. It took ["
-                    + (System.currentTimeMillis() - start) + "] ms");
-            }
+            LOG.debug("Identifier [{}] deleted. It took [{}]ms.", new Object[] {
+                identifier, (System.currentTimeMillis() - start) });
         } catch (AmazonServiceException e) {
             throw new DataStoreException(
                 "Could not getLastModified of dataIdentifier " + identifier, e);
@@ -489,15 +476,9 @@ public class S3Backend implements Backen
                     DataIdentifier identifier = new DataIdentifier(
                         getIdentifierName(s3ObjSumm.getKey()));
                     long lastModified = s3ObjSumm.getLastModified().getTime();
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("id [" + identifier + "], lastModified ["
-                            + lastModified + "]");
-                    }
+                    LOG.debug("Identifier [{}]'s lastModified = [{}]", identifier, lastModified);
                     if (!store.isInUse(identifier) && lastModified < min) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("add id :" + s3ObjSumm.getKey()
-                                + " to delete lists");
-                        }
+                        LOG.debug("add id [{}] to delete lists",  s3ObjSumm.getKey());
                         deleteList.add(new DeleteObjectsRequest.KeyVersion(
                             s3ObjSumm.getKey()));
                         deleteIdSet.add(identifier);
@@ -514,10 +495,8 @@ public class S3Backend implements Backen
                                 + dobjs.getDeletedObjects().size() + " out of "
                                 + deleteList.size() + " are deleted");
                     } else {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(deleteList
-                                + " records deleted from datastore");
-                        }
+                        LOG.debug("[{}] records deleted from datastore",
+                            deleteList);
                     }
                 }
                 if (!prevObjectListing.isTruncated()) {
@@ -530,10 +509,10 @@ public class S3Backend implements Backen
                 Thread.currentThread().setContextClassLoader(contextClassLoader);
             }
         }
-        LOG.info("deleteAllOlderThan: min=[" + min + "] exit. Deleted ["
-            + deleteIdSet + "] records. Number of records deleted ["
-            + deleteIdSet.size() + "] took ["
-            + (System.currentTimeMillis() - start) + "] ms");
+        LOG.info(
+            "deleteAllOlderThan: min=[{}] exit. Deleted[{}] records. Number of records deleted [{}] took [{}]ms",
+            new Object[] { min, deleteIdSet, deleteIdSet.size(),
+                (System.currentTimeMillis() - start) });
         return deleteIdSet;
     }
 
@@ -543,6 +522,7 @@ public class S3Backend implements Backen
         tmx.abortMultipartUploads(bucket, startTime);
         tmx.shutdownNow();
         s3service.shutdown();
+        asyncWriteExecuter.shutdownNow();
         LOG.info("S3Backend closed.");
     }
 
@@ -588,21 +568,15 @@ public class S3Backend implements Backen
                     throw new DataStoreException("Collision: " + key
                         + " new length: " + file.length() + " old length: " + l);
                 }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(key + "   exists, lastmodified ="
-                        + objectMetaData.getLastModified().getTime());
-                }
+                LOG.debug("[{}]'s exists, lastmodified = [{}]", key,
+                    objectMetaData.getLastModified().getTime());
                 CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
                     bucket, key);
                 copReq.setNewObjectMetadata(objectMetaData);
                 s3service.copyObject(copReq);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("lastModified of " + identifier.toString()
-                        + " updated successfully");
-                }
+                LOG.debug("lastModified of [{}] updated successfully.", identifier);
                 if (callback != null) {
-                    callback.call(identifier, file,
-                        AsyncUploadCallback.RESULT.SUCCESS);
+                    callback.onSuccess(new AsyncUploadResult(identifier, file));
                 }
             }
 
@@ -613,27 +587,22 @@ public class S3Backend implements Backen
                         file));
                     // wait for upload to finish
                     if (asyncUpload) {
-                        up.addProgressListener(new S3UploadProgressListener(
+                        up.addProgressListener(new S3UploadProgressListener(up,
                             identifier, file, callback));
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("added upload progress listener to identifier ["
-                                + identifier + "]");
-                        }
+                        LOG.debug(
+                            "added upload progress listener to identifier [{}]",
+                            identifier);
                     } else {
                         up.waitForUploadResult();
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("synchronous upload to identifier ["
-                                + identifier + "] completed.");
-                        }
+                        LOG.debug("synchronous upload to identifier [{}] completed.", identifier); 
                         if (callback != null) {
-                            callback.call(identifier, file,
-                                AsyncUploadCallback.RESULT.SUCCESS);
+                            callback.onSuccess(new AsyncUploadResult(
+                                identifier, file));
                         }
                     }
                 } catch (Exception e2) {
                     if (!asyncUpload) {
-                        callback.call(identifier, file,
-                            AsyncUploadCallback.RESULT.ABORTED);
+                        callback.onAbort(new AsyncUploadResult(identifier, file));
                     }
                     throw new DataStoreException("Could not upload " + key, e2);
                 }
@@ -643,11 +612,10 @@ public class S3Backend implements Backen
                 Thread.currentThread().setContextClassLoader(contextClassLoader);
             }
         }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("write [" + identifier + "] length [" + file.length()
-                + "], in async mode [" + asyncUpload + "] in ["
-                + (System.currentTimeMillis() - start) + "] ms.");
-        }
+        LOG.debug(
+            "write of [{}], length=[{}], in async mode [{}], in [{}]ms",
+            new Object[] { identifier, file.length(), asyncUpload,
+                (System.currentTimeMillis() - start) });
     }
 
     /**
@@ -694,8 +662,8 @@ public class S3Backend implements Backen
             } catch (InterruptedException ie) {
 
             }
-            LOG.info("Renamed [" + count + "] keys, time taken ["
-                + ((System.currentTimeMillis() - startTime) / 1000) + "] sec");
+            LOG.info("Renamed [{}] keys, time taken [{}]sec", count,
+                ((System.currentTimeMillis() - startTime) / 1000));
             // Delete older keys.
             if (deleteList.size() > 0) {
                 DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
@@ -706,9 +674,10 @@ public class S3Backend implements Backen
                     delObjsReq.setKeys(Collections.unmodifiableList(deleteList.subList(
                         startIndex, endIndex)));
                     DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
-                    LOG.info("Records[" + dobjs.getDeletedObjects().size()
-                        + "] deleted in datastore from index [" + startIndex
-                        + "] to [" + (endIndex - 1) + "]");
+                    LOG.info(
+                        "Records[{}] deleted in datastore from index [{}] to [{}]",
+                        new Object[] { dobjs.getDeletedObjects().size(),
+                            startIndex, (endIndex - 1) });
                     if (endIndex == size) {
                         break;
                     } else {
@@ -775,9 +744,7 @@ public class S3Backend implements Backen
                 CopyObjectRequest copReq = new CopyObjectRequest(bucket,
                     oldKey, bucket, newS3Key);
                 s3service.copyObject(copReq);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(oldKey + " renamed to " + newS3Key);
-                }
+                LOG.debug("[{}] renamed to [{}] ", oldKey, newS3Key);
             } finally {
                 if (contextClassLoader != null) {
                     Thread.currentThread().setContextClassLoader(
@@ -801,24 +768,35 @@ public class S3Backend implements Backen
         private DataIdentifier identifier;
 
         private AsyncUploadCallback callback;
+        
+        private Upload upload;
 
-        public S3UploadProgressListener(DataIdentifier identifier, File file,
+        public S3UploadProgressListener(Upload upload, DataIdentifier identifier, File file,
                 AsyncUploadCallback callback) {
             super();
             this.identifier = identifier;
             this.file = file;
             this.callback = callback;
+            this.upload = upload;
         }
 
         public void progressChanged(ProgressEvent progressEvent) {
             switch (progressEvent.getEventCode()) {
                 case ProgressEvent.COMPLETED_EVENT_CODE:
-                    callback.call(identifier, file,
-                        AsyncUploadCallback.RESULT.SUCCESS);
+                    callback.onSuccess(new AsyncUploadResult(identifier, file));
                     break;
                 case ProgressEvent.FAILED_EVENT_CODE:
-                    callback.call(identifier, file,
-                        AsyncUploadCallback.RESULT.FAILED);
+                    AsyncUploadResult result = new AsyncUploadResult(
+                        identifier, file);
+                    try {
+                        AmazonClientException e = upload.waitForException();
+                        if (e != null) {
+                            result.setException(e);
+                        }
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                    callback.onFailure(result);
                     break;
                 default:
                     break;

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/log4j.properties?rev=1585461&r1=1585460&r2=1585461&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/log4j.properties (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/log4j.properties Mon Apr  7 11:54:37 2014
@@ -17,6 +17,8 @@
 
 # this is the log4j configuration for the JCR API tests
 log4j.rootLogger=INFO, file
+log4j.logger.org.apache.jackrabbit.core.data=DEBUG
+log4j.logger.org.apache.jackrabbit.aws.ext.ds=DEBUG
 
 #log4j.logger.org.apache.jackrabbit.test=DEBUG
 

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml?rev=1585461&r1=1585460&r2=1585461&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/test/resources/repository_sample.xml Mon Apr  7 11:54:37 2014
@@ -43,6 +43,7 @@
         <param name="continueOnAsyncUploadFailure" value="false"/>
         <param name="concurrentUploadsThreads" value="10"/>
         <param name="asyncUploadLimit" value="100"/>
+        <param name="uploadRetries" value="3"/>
       </DataStore>
     <!--
         sample database data store configuration

Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java?rev=1585461&r1=1585460&r2=1585461&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java Mon Apr  7 11:54:37 2014
@@ -17,27 +17,24 @@
 
 package org.apache.jackrabbit.core.data;
 
-import java.io.File;
-
 /**
- * This interface defines method which would be called along with status.
+ * This interface defines callback methods to reflect the status of asynchronous
+ * upload.
  */
 public interface AsyncUploadCallback {
-
-    public void call(DataIdentifier identifier, File file, RESULT result);
-
-    public enum RESULT {
-        /**
-         * Asynchronous upload has succeeded.
-         */
-        SUCCESS,
-        /**
-         * Asynchronous upload has failed.
-         */
-        FAILED,
-        /**
-         * Asynchronous upload has been aborted.
-         */
-        ABORTED
-    };
+    
+    /**
+     * Callback method for successful asynchronous upload.
+     */
+    public void onSuccess(AsyncUploadResult result);
+    
+    /**
+     * Callback method for failed asynchronous upload.
+     */
+    public void onFailure(AsyncUploadResult result);
+    
+    /**
+     * Callback method for aborted asynchronous upload.
+     */
+    public void onAbort(AsyncUploadResult result);
 }

Added: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadResult.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadResult.java?rev=1585461&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadResult.java (added)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadResult.java Mon Apr  7 11:54:37 2014
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.data;
+
+import java.io.File;
+
+/**
+ * 
+ * The class holds the result of asynchronous upload to {@link Backend}
+ */
+public class AsyncUploadResult {
+    /**
+     * {@link DataIdentifier} on which asynchronous upload is initiated.
+     */
+    private final DataIdentifier identifier;
+    
+    /**
+     * {@link File} which is asynchronously uploaded.
+     */
+    private final File file;
+    
+    /**
+     * Any {@link Exception} which is raised in asynchronously upload.
+     */
+    private Exception exception;
+    
+    public AsyncUploadResult(DataIdentifier identifier, File file) {
+        super();
+        this.identifier = identifier;
+        this.file = file;
+    }
+
+    public DataIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadResult.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java?rev=1585461&r1=1585460&r2=1585461&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java Mon Apr  7 11:54:37 2014
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -73,6 +74,7 @@ import org.slf4j.LoggerFactory;
  *     &lt;param name="{@link #setContinueOnAsyncUploadFailure(boolean) continueOnAsyncUploadFailure}" value="false"/>
  *     &lt;param name="{@link #setConcurrentUploadsThreads(int) concurrentUploadsThreads}" value="10"/>
  *     &lt;param name="{@link #setAsyncUploadLimit(int) asyncUploadLimit}" value="100"/>
+ *     &lt;param name="{@link #setUploadRetries(int) uploadRetries}" value="3"/>
  * &lt/DataStore>
  */
 public abstract class CachingDataStore extends AbstractDataStore implements
@@ -101,6 +103,15 @@ public abstract class CachingDataStore e
      * are garbage collected.
      */
     protected Map<DataIdentifier, WeakReference<DataIdentifier>> inUse = Collections.synchronizedMap(new WeakHashMap<DataIdentifier, WeakReference<DataIdentifier>>());
+    
+    /**
+     * In memory map to hold failed asynchronous upload {@link DataIdentifier}
+     * and its retry count. Once if all retries are exhausted or file is 
+     * successfully uploaded, then corresponding entry is flushed from the map.
+     * As all failed uploads are synchronously uploaded at startup, this map 
+     * is not required to be persisted. 
+     */
+    protected final Map<DataIdentifier, Integer> uploadRetryMap = new ConcurrentHashMap<DataIdentifier, Integer>(5);
 
     protected Backend backend;
 
@@ -145,6 +156,11 @@ public abstract class CachingDataStore e
      * The number of bytes in the cache. The default value is 64 GB.
      */
     private long cacheSize = 64L * 1024 * 1024 * 1024;
+    
+    /**
+     * The number of retries for failed upload.
+     */
+    private int uploadRetries = 3;
 
     /**
      * The local file system cache.
@@ -209,12 +225,12 @@ public abstract class CachingDataStore e
                 tmpDir = new File(path, "/repository/s3tmp");
                 path = path + "/repository/datastore";
             }
-            LOG.info("path=[" + path + ",] tmpPath= [" + tmpDir.getPath() + "]");
+            LOG.info("path=[{}],  tmpPath=[{}]", path, tmpDir.getPath());
             directory = new File(path);
             mkdirs(directory);
             if (!mkdirs(tmpDir)) {
                 FileUtils.cleanDirectory(tmpDir);
-                LOG.info("tmp = " + tmpDir.getPath() + " cleaned");
+                LOG.info("tmp=[{}] cleaned.", tmpDir.getPath());
             }
 
             asyncWriteCache = new AsyncUploadCache();
@@ -238,24 +254,25 @@ public abstract class CachingDataStore e
                                 + markerFile.getAbsolutePath(), e);
                     }
                 } else {
-                    LOG.info("marker file = " + markerFile.getAbsolutePath()
-                        + " exists");
+                    LOG.info("marker file = [{}] exists ",
+                        markerFile.getAbsolutePath());
                 }
             }
             // upload any leftover async uploads to backend during last shutdown
             Set<String> fileList = asyncWriteCache.getAll();
             if (fileList != null && !fileList.isEmpty()) {
                 List<String> errorFiles = new ArrayList<String>();
-                LOG.info("Uploading [" + fileList + "]  and size ["
-                    + fileList.size() + "] from AsyncUploadCache.");
+                LOG.info("Uploading [{}] and size=[{}] from AsyncUploadCache.",
+                    fileList, fileList.size());
                 long totalSize = 0;
                 List<File> files = new ArrayList<File>(fileList.size());
                 for (String fileName : fileList) {
                     File f = new File(path, fileName);
                     if (!f.exists()) {
                         errorFiles.add(fileName);
-                        LOG.error("Cannot upload pending file ["
-                            + f.getAbsolutePath() + "]. File doesn't exist.");
+                        LOG.error(
+                            "Cannot upload pending file [{}]. File doesn't exist.",
+                            f.getAbsolutePath());
                     } else {
                         totalSize += f.length();
                         files.add(new File(homeDir, fileName));
@@ -264,18 +281,22 @@ public abstract class CachingDataStore e
                 new FilesUploader(files, totalSize, concurrentUploadsThreads,
                     true).upload();
                 if (!continueOnAsyncUploadFailure && errorFiles.size() > 0) {
-                    LOG.error("Pending uploads of files [" + errorFiles
-                        + "] failed. Files do not exist in Local cache.");
-                    LOG.error("To continue set [continueOnAsyncUploadFailure] to true in Datastore configuration in repository.xml."
-                        + " There would be inconsistent data in repository due the missing files. ");
+                    LOG.error(
+                        "Pending uploads of files [{}] failed. Files do not exist in Local cache.",
+                        errorFiles);
+                    LOG.error("To continue set [continueOnAsyncUploadFailure] "
+                        + "to true in Datastore configuration in "
+                        + "repository.xml. There would be inconsistent data "
+                        + "in repository due the missing files. ");
                     throw new RepositoryException(
                         "Cannot upload async uploads from local cache. Files not found.");
                 } else {
                     if (errorFiles.size() > 0) {
-                        LOG.error("Pending uploads of files ["
-                            + errorFiles
-                            + "] failed. Files do not exist"
-                            + " in Local cache. Continuing as [continueOnAsyncUploadFailure] is set to true.");
+                        LOG.error(
+                            "Pending uploads of files [{}] failed. Files do" +
+                            " not exist in Local cache. Continuing as " +
+                            "[continueOnAsyncUploadFailure] is set to true.",
+                            errorFiles);
                     }
                     LOG.info("Reseting AsyncWrite Cache list.");
                     asyncWriteCache.reset();
@@ -324,10 +345,8 @@ public abstract class CachingDataStore e
             long currTime = System.currentTimeMillis();
             DataIdentifier identifier = new DataIdentifier(
                 encodeHexString(digest.digest()));
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("getting SHA1 hash  [" + identifier + "] length ["
-                    + length + "],   in [" + (currTime - startTime) + "] ms.");
-            }
+            LOG.debug("SHA1 of [{}], length =[{}] took [{}]ms ",
+                new Object[] { identifier, length, (currTime - startTime) });
             String fileName = getFileName(identifier);
             AsyncUploadCacheResult result = null;
             synchronized (this) {
@@ -337,10 +356,8 @@ public abstract class CachingDataStore e
                     result = cache.store(fileName, temporary, true);
                 }
             }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("storing  [" + identifier + "] in localCache took ["
-                    + (System.currentTimeMillis() - currTime) + "] ms.");
-            }
+            LOG.debug("storing  [{}] in localCache took [{}] ms", identifier,
+                (System.currentTimeMillis() - currTime));
             if (result != null) {
                 if (result.canAsyncUpload()) {
                     backend.writeAsync(identifier, result.getFile(), this);
@@ -351,11 +368,9 @@ public abstract class CachingDataStore e
             // this will also make sure that
             // tempId is not garbage collected until here
             inUse.remove(tempId);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("write [" + identifier + "] length [" + length
-                    + "],   in [" + (System.currentTimeMillis() - startTime)
-                    + "] ms.");
-            }
+            LOG.debug("addRecord [{}] of length [{}] took [{}]ms.",
+                new Object[] { identifier, length,
+                    (System.currentTimeMillis() - startTime) });
             return new CachingDataRecord(this, identifier);
         } catch (NoSuchAlgorithmException e) {
             throw new DataStoreException(DIGEST + " not available", e);
@@ -427,7 +442,7 @@ public abstract class CachingDataStore e
 
     @Override
     public void updateModifiedDateOnAccess(long before) {
-        LOG.info("minModifiedDate set to: " + before);
+        LOG.info("minModifiedDate set to [{}]", before);
         minModifiedDate = before;
     }
 
@@ -483,8 +498,9 @@ public abstract class CachingDataStore e
         } catch (IOException e) {
             throw new DataStoreException(e);
         }
-        LOG.info("deleteAllOlderThan  exit. Deleted [" + diSet
-            + "] records. Number of records deleted [" + diSet.size() + "]");
+        LOG.info(
+            "deleteAllOlderThan  exit. Deleted [{}]records. Number of records deleted [{}]",
+            diSet, diSet.size());
         return diSet.size();
     }
 
@@ -517,21 +533,19 @@ public abstract class CachingDataStore e
      */
     public long getLastModified(DataIdentifier identifier)
             throws DataStoreException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("accessed lastModified of identifier:" + identifier);
-        }
         String fileName = getFileName(identifier);
         long lastModified = asyncWriteCache.getLastModified(fileName);
         if (lastModified != 0) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("getlastModified of identifier [" + identifier
-                    + "] from AsyncUploadCache = " + lastModified);
-            }
-            return lastModified;
+            LOG.debug(
+                "identifier [{}]'s lastModified retrireved from AsyncUploadCache ",
+                identifier);
 
         } else {
-            return backend.getLastModified(identifier);
+            lastModified =  backend.getLastModified(identifier);
         }
+        LOG.debug("identifier= [{}], lastModified=[{}]", identifier,
+            lastModified);
+        return lastModified;
     }
 
     /**
@@ -575,49 +589,93 @@ public abstract class CachingDataStore e
     public Set<String> getPendingUploads() {
         return asyncWriteCache.getAll();
     }
+    
+    @Override
+    public void onSuccess(AsyncUploadResult result) {
+        DataIdentifier identifier = result.getIdentifier();
+        File file = result.getFile();
+        String fileName = getFileName(identifier);
+        try {
+            LOG.debug("Upload completed for [{}]", identifier);
+            // remove from failed upload map if any.
+            uploadRetryMap.remove(identifier);
+            AsyncUploadCacheResult cachedResult = asyncWriteCache.remove(fileName);
+            if (cachedResult.doRequiresDelete()) {
+                // added record already marked for delete
+                deleteRecord(identifier);
+            }
+        } catch (IOException ie) {
+            LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
+                + identifier + "], file [" + file.getAbsolutePath() + "]", ie);
+        } catch (DataStoreException dse) {
+            LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
+                + identifier + "], file [" + file.getAbsolutePath() + "]", dse);
+        }
+    }
 
-    public void call(DataIdentifier identifier, File file,
-            AsyncUploadCallback.RESULT resultCode) {
+    @Override
+    public void onFailure(AsyncUploadResult result) {
+        DataIdentifier identifier = result.getIdentifier();
+        File file = result.getFile();
         String fileName = getFileName(identifier);
-        if (AsyncUploadCallback.RESULT.SUCCESS.equals(resultCode)) {
-            try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Upload completed. [" + identifier + "].");
-                }
-                AsyncUploadCacheResult result = asyncWriteCache.remove(fileName);
-                if (result.doRequiresDelete()) {
-                    // added record already marked for delete
-                    deleteRecord(identifier);
-                }
-            } catch (IOException ie) {
-                LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
-                    + identifier + "], file [" + file.getAbsolutePath() + "]",
-                    ie);
-            } catch (DataStoreException dse) {
-                LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
-                    + identifier + "], file [" + file.getAbsolutePath() + "]",
-                    dse);
-            }
-        } else if (AsyncUploadCallback.RESULT.FAILED.equals(resultCode)) {
-            LOG.error("Async Upload failed. Dataidentifer [ " + identifier
+        if (result.getException() != null) {
+            LOG.warn("Async Upload failed. Dataidentifer [ " + identifier
+                + "], file [" + file.getAbsolutePath() + "]",
+                result.getException());
+        } else {
+            LOG.warn("Async Upload failed. Dataidentifer [ " + identifier
                 + "], file [" + file.getAbsolutePath() + "]");
-        } else if (AsyncUploadCallback.RESULT.ABORTED.equals(resultCode)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Async Upload Aborted. Dataidentifer [ " + identifier
-                    + "], file [" + file.getAbsolutePath() + "]");
-            }
-            try {
-                asyncWriteCache.remove(fileName);
-                LOG.info("Async Upload Aborted. Dataidentifer [ " + identifier
-                    + "], file [" + file.getAbsolutePath() + "] removed.");
-            } catch (IOException ie) {
-                LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
-                    + identifier + "], file [" + file.getAbsolutePath() + "]",
-                    ie);
+        }
+        // Retry failed upload upto uploadRetries times.
+        try {
+            if (asyncWriteCache.hasEntry(fileName, false)) {
+                synchronized (uploadRetryMap) {
+                    Integer retry = uploadRetryMap.get(identifier);
+                    if (retry == null) {
+                        retry = new Integer(1);
+                    } else {
+                        retry++;
+                    }
+                    if (retry <= uploadRetries) {
+                        uploadRetryMap.put(identifier, retry);
+                        LOG.info(
+                            "Retrying [{}] times failed upload for dataidentifer",
+                            retry, identifier);
+                        try {
+                            backend.writeAsync(identifier, file, this);
+                        } catch (DataStoreException e) {
+                            LOG.warn("exception", e);
+                        }
+                    } else {
+                        LOG.info("Retries [{}] exhausted for  dataidentifer.",
+                            (retry - 1), identifier);
+                        uploadRetryMap.remove(identifier);
+                    }
+                }
             }
+        } catch (IOException ie) {
+            LOG.warn("Cannot retry failed async file upload. Dataidentifer [ "
+                + identifier + "], file [" + file.getAbsolutePath() + "]", ie);
+        }
+    }
+
+    @Override
+    public void onAbort(AsyncUploadResult result) {
+        DataIdentifier identifier = result.getIdentifier();
+        File file = result.getFile();
+        String fileName = getFileName(identifier);
+        try {
+            asyncWriteCache.remove(fileName);
+            LOG.info(
+                "Async Upload Aborted. Dataidentifer [{}], file [{}] removed from AsyncCache.",
+                identifier, file.getAbsolutePath());
+        } catch (IOException ie) {
+            LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
+                + identifier + "], file [" + file.getAbsolutePath() + "]", ie);
         }
     }
 
+
     /**
      * Returns a unique temporary file to be used for creating a new data
      * record.
@@ -636,44 +694,45 @@ public abstract class CachingDataStore e
         for (File f : files) {
             totalSize += f.length();
         }
-        if (concurrentUploadsThreads > 1) {
-            new FilesUploader(files, totalSize, concurrentUploadsThreads, false).upload();
-        } else {
-            uploadFilesInSingleThread(files, totalSize);
+        if (files.size() > 0) {
+            if (concurrentUploadsThreads > 1) {
+                new FilesUploader(files, totalSize, concurrentUploadsThreads,
+                    false).upload();
+            } else {
+                uploadFilesInSingleThread(files, totalSize);
+            }
         }
     }
 
     private void uploadFilesInSingleThread(List<File> files, long totalSize)
             throws RepositoryException {
         long startTime = System.currentTimeMillis();
-        LOG.info("Upload:  {" + files.size() + "} files in single thread.");
+        LOG.info("Upload:  [{}] files in single thread.", files.size());
         long currentCount = 0;
         long currentSize = 0;
         long time = System.currentTimeMillis();
         for (File f : files) {
-            long now = System.currentTimeMillis();
-            if (now > time + 5000) {
-                LOG.info("Uploaded:  {" + currentCount + "}/{" + files.size()
-                    + "} files, {" + currentSize + "}/{" + totalSize
-                    + "} size data");
-                time = now;
-            }
             String name = f.getName();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("upload file = " + name);
-            }
+            LOG.debug("upload file [{}] ", name);
             if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
                 && f.length() > 0) {
                 uploadFileToBackEnd(f, false);
             }
             currentSize += f.length();
             currentCount++;
+            long now = System.currentTimeMillis();
+            if (now > time + 5000) {
+                LOG.info("Uploaded:  [{}/{}] files, [{}/{}] size data",
+                    new Object[] { currentCount, files.size(), currentSize,
+                        totalSize });
+                time = now;
+            }
         }
         long endTime = System.currentTimeMillis();
-        LOG.info("Uploaded:  {" + currentCount + "}/{" + files.size()
-            + "} files, {" + currentSize + "}/{" + totalSize
-            + "} size data, time taken {" + ((endTime - startTime) / 1000)
-            + "} sec");
+        LOG.info(
+            "Uploaded:  [{}/{}] files, [{}/{}] size data, time taken = [{}] sec",
+            new Object[] { currentCount, files.size(), currentSize, totalSize,
+                ((endTime - startTime) / 1000) });
     }
 
     /**
@@ -709,9 +768,7 @@ public abstract class CachingDataStore e
                 String fileName = getFileName(identifier);
                 asyncWriteCache.remove(fileName);
             }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(f.getName() + "uploaded.");
-            }
+            LOG.debug("uploaded [{}]", f.getName());
         } catch (IOException ioe) {
             throw new DataStoreException(ioe);
         }
@@ -910,6 +967,14 @@ public abstract class CachingDataStore e
             boolean continueOnAsyncUploadFailure) {
         this.continueOnAsyncUploadFailure = continueOnAsyncUploadFailure;
     }
+    
+    public int getUploadRetries() {
+        return uploadRetries;
+    }
+
+    public void setUploadRetries(int uploadRetries) {
+        this.uploadRetries = uploadRetries;
+    }
 
     public Backend getBackend() {
         return backend;
@@ -962,15 +1027,14 @@ public abstract class CachingDataStore e
         }
 
         void logProgress() {
-            LOG.info("Uploaded:  {" + currentCount.get() + "}/{" + files.size()
-                + "} files, {" + currentSize.get() + "}/{" + totalSize
-                + "} size data");
+            LOG.info("Uploaded:  [{}/{}] files, [{}/{}] size data",
+                new Object[] { currentCount, files.size(), currentSize,
+                    totalSize });
         }
 
         void upload() throws DataStoreException {
             long startTime = System.currentTimeMillis();
-            LOG.info(" Uploading " + files.size() + " using " + threads
-                + " threads.");
+            LOG.info(" Uploading [{}] using [{}] threads.", files.size(), threads);
             ExecutorService executor = Executors.newFixedThreadPool(threads,
                 new NamedThreadFactory("backend-file-upload-worker"));
             int partitionSize = files.size() / (threads);
@@ -1005,10 +1069,10 @@ public abstract class CachingDataStore e
 
             }
             long endTime = System.currentTimeMillis();
-            LOG.info("Uploaded:  {" + currentCount.get() + "}/{" + files.size()
-                + "} files, {" + currentSize.get() + "}/{" + totalSize
-                + "} size data, time taken {" + ((endTime - startTime) / 1000)
-                + "} sec");
+            LOG.info(
+                "Uploaded:  [{}/{}] files, [{}/{}] size data, time taken = [{}] sec",
+                new Object[] { currentCount, files.size(), currentSize,
+                    totalSize, ((endTime - startTime) / 1000) });
             if (isExceptionRaised()) {
                 executor.shutdownNow(); // Cancel currently executing tasks
                 throw exception;
@@ -1044,12 +1108,10 @@ public abstract class CachingDataStore e
 
         public void run() {
             long time = System.currentTimeMillis();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Thread [ " + Thread.currentThread().getName()
-                    + "]: Uploading files from startIndex[" + startIndex
-                    + "] and endIndex [" + (endIndex - 1)
-                    + "], both inclusive.");
-            }
+            LOG.debug(
+                "Thread [{}] : Uploading files from startIndex [{}] to endIndex [{}] both inclusive.",
+                new Object[] { Thread.currentThread().getName(), startIndex,
+                    (endIndex - 1) });
             int uploadCount = 0;
             long uploadSize = 0;
             try {
@@ -1059,9 +1121,7 @@ public abstract class CachingDataStore e
                         break;
                     }
                     String name = f.getName();
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("upload file = " + name);
-                    }
+                    LOG.debug("upload file [{}] ",name);
                     if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
                         && f.length() > 0) {
                         uploadFileToBackEnd(f, updateAsyncCache);

Modified: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java?rev=1585461&r1=1585460&r2=1585461&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java Mon Apr  7 11:54:37 2014
@@ -158,13 +158,12 @@ public class InMemoryBackend implements 
             timeMap.put(identifier, System.currentTimeMillis());
         } catch (IOException e) {
             if (async) {
-                callback.call(identifier, file,
-                    AsyncUploadCallback.RESULT.ABORTED);
+                callback.onAbort(new AsyncUploadResult(identifier, file));
             }
             throw new DataStoreException(e);
         }
         if (async) {
-            callback.call(identifier, file, AsyncUploadCallback.RESULT.SUCCESS);
+            callback.onSuccess(new AsyncUploadResult(identifier, file));
         }
     }
 



Mime
View raw message