jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpfis...@apache.org
Subject svn commit: r1577127 [2/3] - in /jackrabbit/trunk: jackrabbit-aws-ext/ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/ jackrabbit-aws-ext/src/test/java/org/apache/jackra...
Date Thu, 13 Mar 2014 12:12:34 GMT
Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java Thu Mar 13 12:12:33 2014
@@ -20,9 +20,7 @@ package org.apache.jackrabbit.core.data;
 import java.io.File;
 import java.io.InputStream;
 import java.util.Iterator;
-import java.util.List;
-
-
+import java.util.Set;
 
 /**
  * The interface defines the backend which can be plugged into
@@ -33,9 +31,12 @@ public interface Backend {
     /**
      * This method initialize backend with the configuration.
      * 
-     * @param store {@link CachingDataStore}
-     * @param homeDir path of repository home dir.
-     * @param config path of config property file.
+     * @param store
+     *            {@link CachingDataStore}
+     * @param homeDir
+     *            path of repository home dir.
+     * @param config
+     *            path of config property file.
      * @throws DataStoreException
      */
     void init(CachingDataStore store, String homeDir, String config)
@@ -44,27 +45,33 @@ public interface Backend {
     /**
      * Return inputstream of record identified by identifier.
      * 
-     * @param identifier identifier of record.
+     * @param identifier
+     *            identifier of record.
      * @return inputstream of the record.
-     * @throws DataStoreException if record not found or any error.
+     * @throws DataStoreException
+     *             if record not found or any error.
      */
     InputStream read(DataIdentifier identifier) throws DataStoreException;
 
     /**
      * Return length of record identified by identifier.
      * 
-     * @param identifier identifier of record.
+     * @param identifier
+     *            identifier of record.
      * @return length of the record.
-     * @throws DataStoreException if record not found or any error.
+     * @throws DataStoreException
+     *             if record not found or any error.
      */
     long getLength(DataIdentifier identifier) throws DataStoreException;
 
     /**
      * Return lastModified of record identified by identifier.
      * 
-     * @param identifier identifier of record.
+     * @param identifier
+     *            identifier of record.
      * @return lastModified of the record.
-     * @throws DataStoreException if record not found or any error.
+     * @throws DataStoreException
+     *             if record not found or any error.
      */
     long getLastModified(DataIdentifier identifier) throws DataStoreException;
 
@@ -72,30 +79,54 @@ public interface Backend {
      * Stores file to backend with identifier used as key. If key pre-exists, it
      * updates the timestamp of the key.
      * 
-     * @param identifier key of the file 
-     * @param file file that would be stored in backend.
-     * @throws DataStoreException for any error.
+     * @param identifier
+     *            key of the file
+     * @param file
+     *            file that would be stored in backend.
+     * @throws DataStoreException
+     *             for any error.
      */
     void write(DataIdentifier identifier, File file) throws DataStoreException;
 
     /**
-     * Returns identifiers of all records that exists in backend. 
+     * Write file to backend in asynchronous mode. Backend implmentation may
+     * choose not to write asynchronously but it requires to call
+     * {@link AsyncUploadCallback#call(DataIdentifier, File, com.day.crx.cloud.s3.ds.AsyncUploadCallback.RESULT)}
+     * after upload succeed or failed.
+     * 
+     * @param identifier
+     * @param file
+     * @param callback
+     *            Callback interface to called after upload succeed or failed.
+     * @throws DataStoreException
+     */
+    void writeAsync(DataIdentifier identifier, File file,
+            AsyncUploadCallback callback) throws DataStoreException;
+
+    /**
+     * Returns identifiers of all records that exists in backend.
+     * 
      * @return iterator consisting of all identifiers
      * @throws DataStoreException
      */
     Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException;
 
     /**
-     * Update timestamp of record identified by identifier if minModifiedDate is
-     * greater than record's lastModified else no op.
+     * This method check the existence of record in backend. Return true if
+     * records exists else false. This method also touch record identified by
+     * identifier if touch is true.
      * 
-     * @throws DataStoreException if record not found.
+     * @param identifier
+     * @throws DataStoreException
      */
-    void touch(DataIdentifier identifier, long minModifiedDate)
+    boolean exists(DataIdentifier identifier, boolean touch)
             throws DataStoreException;
+
     /**
-     * This method check the existence of record in backend. 
-     * @param identifier identifier to be checked. 
+     * This method check the existence of record in backend.
+     * 
+     * @param identifier
+     *            identifier to be checked.
      * @return true if records exists else false.
      * @throws DataStoreException
      */
@@ -103,22 +134,27 @@ public interface Backend {
 
     /**
      * Close backend and release resources like database connection if any.
+     * 
      * @throws DataStoreException
      */
     void close() throws DataStoreException;
 
     /**
      * Delete all records which are older than timestamp.
+     * 
      * @param timestamp
-     * @return list of identifiers which are deleted. 
+     * @return {@link Set} of identifiers which are deleted.
      * @throws DataStoreException
      */
-    List<DataIdentifier> deleteAllOlderThan(long timestamp) throws DataStoreException;
+    Set<DataIdentifier> deleteAllOlderThan(long timestamp)
+            throws DataStoreException;
 
     /**
      * Delete record identified by identifier. No-op if identifier not found.
+     * 
      * @param identifier
      * @throws DataStoreException
      */
     void deleteRecord(DataIdentifier identifier) throws DataStoreException;
 }
+

Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java Thu Mar 13 12:12:33 2014
@@ -19,6 +19,8 @@ package org.apache.jackrabbit.core.data;
 
 import java.io.InputStream;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * CachingDataRecord which stores reference to {@link CachingDataStore}. This
@@ -27,6 +29,8 @@ import java.io.InputStream;
  */
 public class CachingDataRecord extends AbstractDataRecord {
 
+    private static final Logger LOG = LoggerFactory.getLogger(CachingDataRecord.class);
+
     private final CachingDataStore store;
 
     public CachingDataRecord(CachingDataStore store, DataIdentifier identifier) {
@@ -39,6 +43,8 @@ public class CachingDataRecord extends A
         try {
             return store.getLastModified(getIdentifier());
         } catch (DataStoreException dse) {
+            LOG.info("exception in getLastModified for identifier ["
+                + getIdentifier() + "]. returning 0.", dse);
             return 0;
         }
     }

Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java Thu Mar 13 12:12:33 2014
@@ -29,15 +29,24 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.WeakHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jcr.RepositoryException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,10 +70,13 @@ import org.slf4j.LoggerFactory;
  *     &lt;param name="{@link #setCachePurgeTrigFactor(double)}" value="0.95d"/>
  *     &lt;param name="{@link #setCachePurgeResizeFactor(double) cacheSize}" value="0.85d"/>
  *     &lt;param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/>
+ *     &lt;param name="{@link #setContinueOnAsyncUploadFailure(boolean) continueOnAsyncUploadFailure}" value="false"/>
+ *     &lt;param name="{@link #setConcurrentUploadsThreads(int) concurrentUploadsThreads}" value="10"/>
+ *     &lt;param name="{@link #setAsyncUploadLimit(int) asyncUploadLimit}" value="100"/>
  * &lt/DataStore>
  */
 public abstract class CachingDataStore extends AbstractDataStore implements
-        MultiDataStoreAware {
+        MultiDataStoreAware, AsyncUploadCallback {
 
     /**
      * Logger instance.
@@ -88,9 +100,7 @@ public abstract class CachingDataStore e
      * All data identifiers that are currently in use are in this set until they
      * are garbage collected.
      */
-    protected Map<DataIdentifier, WeakReference<DataIdentifier>> inUse =
-            Collections.synchronizedMap(new WeakHashMap<DataIdentifier,
-                    WeakReference<DataIdentifier>>());
+    protected Map<DataIdentifier, WeakReference<DataIdentifier>> inUse = Collections.synchronizedMap(new WeakHashMap<DataIdentifier, WeakReference<DataIdentifier>>());
 
     protected Backend backend;
 
@@ -141,11 +151,47 @@ public abstract class CachingDataStore e
      */
     private LocalCache cache;
 
+    /**
+     * Caching holding pending uploads
+     */
+    private AsyncUploadCache asyncWriteCache;
+
     protected abstract Backend createBackend();
 
     protected abstract String getMarkerFile();
 
     /**
+     * In {@link #init(String)},it resumes all incomplete asynchronous upload
+     * from {@link AsyncUploadCache} and uploads them concurrently in multiple
+     * threads. It throws {@link RepositoryException}, if file is not found in
+     * local cache for that asynchronous upload. As far as code is concerned, it
+     * is only possible when somebody has removed files from local cache
+     * manually. If there is an exception and user want to proceed with
+     * inconsistencies, set parameter continueOnAsyncUploadFailure to true in
+     * repository.xml. This will ignore {@link RepositoryException} and log all
+     * missing files and proceed after resetting {@link AsyncUploadCache} .
+     */
+    private boolean continueOnAsyncUploadFailure;
+
+    /**
+     * The {@link #init(String)} methods checks for {@link #getMarkerFile()} and
+     * if it doesn't exists migrates all files from fileystem to {@link Backend}
+     * . This parameter governs number of threads which will upload files
+     * concurrently to {@link Backend}.
+     */
+    private int concurrentUploadsThreads = 10;
+
+    /**
+     * This parameter limits the number of asynchronous uploads slots to
+     * {@link Backend}. Once this limit is reached, further uploads to
+     * {@link Backend} are synchronous, till one of asynchronous uploads
+     * completes and make asynchronous uploads slot available. To disable
+     * asynchronous upload, set {@link #asyncUploadLimit} parameter to 0 in
+     * repository.xml. By default it is 100
+     */
+    private int asyncUploadLimit = 100;
+
+    /**
      * Initialized the data store. If the path is not set, &lt;repository
      * home&gt;/repository/datastore is used. This directory is automatically
      * created if it does not yet exist. During first initialization, it upload
@@ -154,51 +200,92 @@ public abstract class CachingDataStore e
      */
     @Override
     public void init(String homeDir) throws RepositoryException {
-        if (path == null) {
-            path = homeDir + "/repository/datastore";
-        }
-        directory = new File(path);
         try {
+            if (path == null) {
+                path = homeDir + "/repository/datastore";
+                tmpDir = new File(homeDir, "/repository/s3tmp");
+            } else {
+                // cache is moved from 'path' to 'path'/repository/datastore
+                tmpDir = new File(path, "/repository/s3tmp");
+                path = path + "/repository/datastore";
+            }
+            LOG.info("path=[" + path + ",] tmpPath= [" + tmpDir.getPath() + "]");
+            directory = new File(path);
             mkdirs(directory);
-        } catch (IOException e) {
-            throw new DataStoreException("Could not create directory "
-                    + directory.getAbsolutePath(), e);
-        }
-        tmpDir = new File(homeDir, "/repository/s3tmp");
-        try {
             if (!mkdirs(tmpDir)) {
                 FileUtils.cleanDirectory(tmpDir);
                 LOG.info("tmp = " + tmpDir.getPath() + " cleaned");
             }
-        } catch (IOException e) {
-            throw new DataStoreException("Could not create directory "
-                    + tmpDir.getAbsolutePath(), e);
-        }
-        LOG.info("cachePurgeTrigFactor = " + cachePurgeTrigFactor
-                + ", cachePurgeResizeFactor = " + cachePurgeResizeFactor);
-        backend = createBackend();
-        backend.init(this, path, config);
-        String markerFileName = getMarkerFile();
-        if (markerFileName != null) {
-            // create marker file in homeDir to avoid deletion in cache cleanup.
-            File markerFile = new File(homeDir, markerFileName);
-            if (!markerFile.exists()) {
-                LOG.info("load files from local cache");
-                loadFilesFromCache();
-                try {
-                    markerFile.createNewFile();
-                } catch (IOException e) {
-                    throw new DataStoreException(
+
+            asyncWriteCache = new AsyncUploadCache();
+            asyncWriteCache.init(homeDir, path, asyncUploadLimit);
+
+            backend = createBackend();
+            backend.init(this, path, config);
+            String markerFileName = getMarkerFile();
+            if (markerFileName != null) {
+                // create marker file in homeDir to avoid deletion in cache
+                // cleanup.
+                File markerFile = new File(homeDir, markerFileName);
+                if (!markerFile.exists()) {
+                    LOG.info("load files from local cache");
+                    uploadFilesFromCache();
+                    try {
+                        markerFile.createNewFile();
+                    } catch (IOException e) {
+                        throw new DataStoreException(
                             "Could not create marker file "
-                                    + markerFile.getAbsolutePath(), e);
-                }
-            } else {
-                LOG.info("marker file = " + markerFile.getAbsolutePath()
+                                + markerFile.getAbsolutePath(), e);
+                    }
+                } else {
+                    LOG.info("marker file = " + markerFile.getAbsolutePath()
                         + " exists");
+                }
             }
+            // upload any leftover async uploads to backend during last shutdown
+            Set<String> fileList = asyncWriteCache.getAll();
+            if (fileList != null && !fileList.isEmpty()) {
+                List<String> errorFiles = new ArrayList<String>();
+                LOG.info("Uploading [" + fileList + "]  and size ["
+                    + fileList.size() + "] from AsyncUploadCache.");
+                long totalSize = 0;
+                List<File> files = new ArrayList<File>(fileList.size());
+                for (String fileName : fileList) {
+                    File f = new File(path, fileName);
+                    if (!f.exists()) {
+                        errorFiles.add(fileName);
+                        LOG.error("Cannot upload pending file ["
+                            + f.getAbsolutePath() + "]. File doesn't exist.");
+                    } else {
+                        totalSize += f.length();
+                        files.add(new File(homeDir, fileName));
+                    }
+                }
+                new FilesUploader(files, totalSize, concurrentUploadsThreads,
+                    true).upload();
+                if (!continueOnAsyncUploadFailure && errorFiles.size() > 0) {
+                    LOG.error("Pending uploads of files [" + errorFiles
+                        + "] failed. Files do not exist in Local cache.");
+                    LOG.error("To continue set [continueOnAsyncUploadFailure] to true in Datastore configuration in repository.xml."
+                        + " There would be inconsistent data in repository due the missing files. ");
+                    throw new RepositoryException(
+                        "Cannot upload async uploads from local cache. Files not found.");
+                } else {
+                    if (errorFiles.size() > 0) {
+                        LOG.error("Pending uploads of files ["
+                            + errorFiles
+                            + "] failed. Files do not exist"
+                            + " in Local cache. Continuing as [continueOnAsyncUploadFailure] is set to true.");
+                    }
+                    LOG.info("Reseting AsyncWrite Cache list.");
+                    asyncWriteCache.reset();
+                }
+            }
+            cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
+                cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache);
+        } catch (Exception e) {
+            throw new RepositoryException(e);
         }
-        cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
-                cachePurgeTrigFactor, cachePurgeResizeFactor);
     }
 
     /**
@@ -218,6 +305,8 @@ public abstract class CachingDataStore e
     @Override
     public DataRecord addRecord(InputStream input) throws DataStoreException {
         File temporary = null;
+        long startTime = System.currentTimeMillis();
+        long length = 0;
         try {
             temporary = newTemporaryFile();
             DataIdentifier tempId = new DataIdentifier(temporary.getName());
@@ -226,23 +315,47 @@ public abstract class CachingDataStore e
             // stream length and the message digest of the stream
             MessageDigest digest = MessageDigest.getInstance(DIGEST);
             OutputStream output = new DigestOutputStream(new FileOutputStream(
-                    temporary), digest);
+                temporary), digest);
             try {
-                IOUtils.copyLarge(input, output);
+                length = IOUtils.copyLarge(input, output);
             } finally {
                 output.close();
             }
+            long currTime = System.currentTimeMillis();
             DataIdentifier identifier = new DataIdentifier(
-                    encodeHexString(digest.digest()));
+                encodeHexString(digest.digest()));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("getting SHA1 hash  [" + identifier + "] length ["
+                    + length + "],   in [" + (currTime - startTime) + "] ms.");
+            }
+            String fileName = getFileName(identifier);
+            AsyncUploadCacheResult result = null;
             synchronized (this) {
                 usesIdentifier(identifier);
-                backend.write(identifier, temporary);
-                String fileName = getFileName(identifier);
-                cache.store(fileName, temporary);
+                // check if async upload is already in progress
+                if (!asyncWriteCache.hasEntry(fileName, true)) {
+                    result = cache.store(fileName, temporary, true);
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("storing  [" + identifier + "] in localCache took ["
+                    + (System.currentTimeMillis() - currTime) + "] ms.");
+            }
+            if (result != null) {
+                if (result.canAsyncUpload()) {
+                    backend.writeAsync(identifier, result.getFile(), this);
+                } else {
+                    backend.write(identifier, result.getFile());
+                }
             }
             // this will also make sure that
             // tempId is not garbage collected until here
             inUse.remove(tempId);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("write [" + identifier + "] length [" + length
+                    + "],   in [" + (System.currentTimeMillis() - startTime)
+                    + "] ms.");
+            }
             return new CachingDataRecord(this, identifier);
         } catch (NoSuchAlgorithmException e) {
             throw new DataStoreException(DIGEST + " not available", e);
@@ -256,6 +369,35 @@ public abstract class CachingDataStore e
         }
     }
 
+    @Override
+    public DataRecord getRecord(DataIdentifier identifier)
+            throws DataStoreException {
+        String fileName = getFileName(identifier);
+        boolean touch = minModifiedDate > 0 ? true : false;
+        synchronized (this) {
+            try {
+                if (asyncWriteCache.hasEntry(fileName, touch)) {
+                    usesIdentifier(identifier);
+                    return new CachingDataRecord(this, identifier);
+                } else if (cache.getFileIfStored(fileName) != null) {
+                    if (touch) {
+                        backend.exists(identifier, touch);
+                    }
+                    usesIdentifier(identifier);
+                    return new CachingDataRecord(this, identifier);
+                } else if (backend.exists(identifier, touch)) {
+                    usesIdentifier(identifier);
+                    return new CachingDataRecord(this, identifier);
+                }
+
+            } catch (IOException ioe) {
+                throw new DataStoreException("error in getting record ["
+                    + identifier + "]", ioe);
+            }
+        }
+        throw new DataStoreException("Record not found: " + identifier);
+    }
+
     /**
      * Get a data record for the given identifier or null it data record doesn't
      * exist in {@link Backend}
@@ -267,14 +409,20 @@ public abstract class CachingDataStore e
     @Override
     public DataRecord getRecordIfStored(DataIdentifier identifier)
             throws DataStoreException {
+        String fileName = getFileName(identifier);
+        boolean touch = minModifiedDate > 0 ? true : false;
         synchronized (this) {
-            usesIdentifier(identifier);
-            if (!backend.exists(identifier)) {
-                return null;
+            try {
+                if (asyncWriteCache.hasEntry(fileName, touch)
+                    || backend.exists(identifier, touch)) {
+                    usesIdentifier(identifier);
+                    return new CachingDataRecord(this, identifier);
+                }
+            } catch (IOException ioe) {
+                throw new DataStoreException(ioe);
             }
-            backend.touch(identifier, minModifiedDate);
-            return new CachingDataRecord(this, identifier);
         }
+        return null;
     }
 
     @Override
@@ -289,7 +437,15 @@ public abstract class CachingDataStore e
     @Override
     public Iterator<DataIdentifier> getAllIdentifiers()
             throws DataStoreException {
-        return backend.getAllIdentifiers();
+        Set<DataIdentifier> ids = new HashSet<DataIdentifier>();
+        for (String fileName : asyncWriteCache.getAll()) {
+            ids.add(getIdentifier(fileName));
+        }
+        Iterator<DataIdentifier> itr = backend.getAllIdentifiers();
+        while (itr.hasNext()) {
+            ids.add(itr.next());
+        }
+        return ids.iterator();
     }
 
     /**
@@ -301,20 +457,35 @@ public abstract class CachingDataStore e
             throws DataStoreException {
         String fileName = getFileName(identifier);
         synchronized (this) {
-            backend.deleteRecord(identifier);
-            cache.delete(fileName);
+            try {
+                // order is important here
+                asyncWriteCache.delete(fileName);
+                backend.deleteRecord(identifier);
+                cache.delete(fileName);
+            } catch (IOException ioe) {
+                throw new DataStoreException(ioe);
+            }
         }
     }
 
     @Override
     public synchronized int deleteAllOlderThan(long min)
             throws DataStoreException {
-        List<DataIdentifier> diList = backend.deleteAllOlderThan(min);
+        Set<DataIdentifier> diSet = backend.deleteAllOlderThan(min);
         // remove entries from local cache
-        for (DataIdentifier identifier : diList) {
+        for (DataIdentifier identifier : diSet) {
             cache.delete(getFileName(identifier));
         }
-        return diList.size();
+        try {
+            for (String fileName : asyncWriteCache.deleteOlderThan(min)) {
+                diSet.add(getIdentifier(fileName));
+            }
+        } catch (IOException e) {
+            throw new DataStoreException(e);
+        }
+        LOG.info("deleteAllOlderThan  exit. Deleted [" + diSet
+            + "] records. Number of records deleted [" + diSet.size() + "]");
+        return diSet.size();
     }
 
     /**
@@ -344,9 +515,23 @@ public abstract class CachingDataStore e
      * Return lastModified of record from {@link Backend} assuming
      * {@link Backend} as a single source of truth.
      */
-    public long getLastModified(DataIdentifier identifier) throws DataStoreException {
-        LOG.info("accessed lastModified");
-        return backend.getLastModified(identifier);
+    public long getLastModified(DataIdentifier identifier)
+            throws DataStoreException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("accessed lastModified of identifier:" + identifier);
+        }
+        String fileName = getFileName(identifier);
+        long lastModified = asyncWriteCache.getLastModified(fileName);
+        if (lastModified != 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("getlastModified of identifier [" + identifier
+                    + "] from AsyncUploadCache = " + lastModified);
+            }
+            return lastModified;
+
+        } else {
+            return backend.getLastModified(identifier);
+        }
     }
 
     /**
@@ -358,6 +543,22 @@ public abstract class CachingDataStore e
         Long length = cache.getFileLength(fileName);
         if (length != null) {
             return length.longValue();
+        } else {
+            InputStream in = null;
+            InputStream cachedStream = null;
+            try {
+                in = backend.read(identifier);
+                cachedStream = cache.store(fileName, in);
+            } catch (IOException e) {
+                throw new DataStoreException("IO Exception: " + identifier, e);
+            } finally {
+                IOUtils.closeQuietly(in);
+                IOUtils.closeQuietly(cachedStream);
+            }
+            length = cache.getFileLength(fileName);
+            if (length != null) {
+                return length.longValue();
+            }
         }
         return backend.getLength(identifier);
     }
@@ -371,6 +572,52 @@ public abstract class CachingDataStore e
         }
     }
 
+    public Set<String> getPendingUploads() {
+        return asyncWriteCache.getAll();
+    }
+
+    public void call(DataIdentifier identifier, File file,
+            AsyncUploadCallback.RESULT resultCode) {
+        String fileName = getFileName(identifier);
+        if (AsyncUploadCallback.RESULT.SUCCESS.equals(resultCode)) {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Upload completed. [" + identifier + "].");
+                }
+                AsyncUploadCacheResult result = asyncWriteCache.remove(fileName);
+                if (result.doRequiresDelete()) {
+                    // added record already marked for delete
+                    deleteRecord(identifier);
+                }
+            } catch (IOException ie) {
+                LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
+                    + identifier + "], file [" + file.getAbsolutePath() + "]",
+                    ie);
+            } catch (DataStoreException dse) {
+                LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
+                    + identifier + "], file [" + file.getAbsolutePath() + "]",
+                    dse);
+            }
+        } else if (AsyncUploadCallback.RESULT.FAILED.equals(resultCode)) {
+            LOG.error("Async Upload failed. Dataidentifer [ " + identifier
+                + "], file [" + file.getAbsolutePath() + "]");
+        } else if (AsyncUploadCallback.RESULT.ABORTED.equals(resultCode)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Async Upload Aborted. Dataidentifer [ " + identifier
+                    + "], file [" + file.getAbsolutePath() + "]");
+            }
+            try {
+                asyncWriteCache.remove(fileName);
+                LOG.info("Async Upload Aborted. Dataidentifer [ " + identifier
+                    + "], file [" + file.getAbsolutePath() + "] removed.");
+            } catch (IOException ie) {
+                LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
+                    + identifier + "], file [" + file.getAbsolutePath() + "]",
+                    ie);
+            }
+        }
+    }
+
     /**
      * Returns a unique temporary file to be used for creating a new data
      * record.
@@ -382,36 +629,57 @@ public abstract class CachingDataStore e
     /**
      * Load files from {@link LocalCache} to {@link Backend}.
      */
-    private void loadFilesFromCache() throws RepositoryException {
+    private void uploadFilesFromCache() throws RepositoryException {
         ArrayList<File> files = new ArrayList<File>();
         listRecursive(files, directory);
         long totalSize = 0;
         for (File f : files) {
             totalSize += f.length();
         }
+        if (concurrentUploadsThreads > 1) {
+            new FilesUploader(files, totalSize, concurrentUploadsThreads, false).upload();
+        } else {
+            uploadFilesInSingleThread(files, totalSize);
+        }
+    }
+
+    private void uploadFilesInSingleThread(List<File> files, long totalSize)
+            throws RepositoryException {
+        long startTime = System.currentTimeMillis();
+        LOG.info("Upload:  {" + files.size() + "} files in single thread.");
+        long currentCount = 0;
         long currentSize = 0;
         long time = System.currentTimeMillis();
         for (File f : files) {
             long now = System.currentTimeMillis();
             if (now > time + 5000) {
-                LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+                LOG.info("Uploaded:  {" + currentCount + "}/{" + files.size()
+                    + "} files, {" + currentSize + "}/{" + totalSize
+                    + "} size data");
                 time = now;
             }
-            currentSize += f.length();
             String name = f.getName();
-            LOG.debug("upload file = " + name);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("upload file = " + name);
+            }
             if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
-                    && f.length() > 0) {
-                loadFileToBackEnd(f);
+                && f.length() > 0) {
+                uploadFileToBackEnd(f, false);
             }
+            currentSize += f.length();
+            currentCount++;
         }
-        LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+        long endTime = System.currentTimeMillis();
+        LOG.info("Uploaded:  {" + currentCount + "}/{" + files.size()
+            + "} files, {" + currentSize + "}/{" + totalSize
+            + "} size data, time taken {" + ((endTime - startTime) / 1000)
+            + "} sec");
     }
 
     /**
      * Traverse recursively and populate list with files.
      */
-    private void listRecursive(List<File> list, File file) {
+    private static void listRecursive(List<File> list, File file) {
         File[] files = file.listFiles();
         if (files != null) {
             for (File f : files) {
@@ -431,12 +699,22 @@ public abstract class CachingDataStore e
      *            file to uploaded.
      * @throws DataStoreException
      */
-    private void loadFileToBackEnd(File f) throws DataStoreException {
-        DataIdentifier identifier = new DataIdentifier(f.getName());
-        usesIdentifier(identifier);
-        backend.write(identifier, f);
-        LOG.debug(f.getName() + "uploaded.");
-
+    private void uploadFileToBackEnd(File f, boolean updateAsyncUploadCache)
+            throws DataStoreException {
+        try {
+            DataIdentifier identifier = new DataIdentifier(f.getName());
+            usesIdentifier(identifier);
+            backend.write(identifier, f);
+            if (updateAsyncUploadCache) {
+                String fileName = getFileName(identifier);
+                asyncWriteCache.remove(fileName);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(f.getName() + "uploaded.");
+            }
+        } catch (IOException ioe) {
+            throw new DataStoreException(ioe);
+        }
     }
 
     /**
@@ -444,9 +722,17 @@ public abstract class CachingDataStore e
      */
     private static String getFileName(DataIdentifier identifier) {
         String name = identifier.toString();
-        name = name.substring(0, 2) + "/" + name.substring(2, 4) + "/"
-                + name.substring(4, 6) + "/" + name;
-        return name;
+        return getFileName(name);
+    }
+
+    private static String getFileName(String name) {
+        return name.substring(0, 2) + "/" + name.substring(2, 4) + "/"
+            + name.substring(4, 6) + "/" + name;
+    }
+
+    private static DataIdentifier getIdentifier(String fileName) {
+        return new DataIdentifier(
+            fileName.substring(fileName.lastIndexOf("/") + 1));
     }
 
     private void usesIdentifier(DataIdentifier identifier) {
@@ -457,15 +743,15 @@ public abstract class CachingDataStore e
         if (dir.exists()) {
             if (dir.isFile()) {
                 throw new IOException("Can not create a directory "
-                        + "because a file exists with the same name: "
-                        + dir.getAbsolutePath());
+                    + "because a file exists with the same name: "
+                    + dir.getAbsolutePath());
             }
             return false;
         }
         boolean created = dir.mkdirs();
         if (!created) {
             throw new IOException("Could not create directory: "
-                    + dir.getAbsolutePath());
+                + dir.getAbsolutePath());
         }
         return created;
     }
@@ -483,7 +769,6 @@ public abstract class CachingDataStore e
     public void close() throws DataStoreException {
         cache.close();
         backend.close();
-        cache = null;
     }
 
     /**
@@ -551,7 +836,6 @@ public abstract class CachingDataStore e
     }
 
     /**
-     * 
      * @return path of {@link LocalCache}.
      */
     public String getPath() {
@@ -602,4 +886,208 @@ public abstract class CachingDataStore e
         this.cachePurgeResizeFactor = cachePurgeResizeFactor;
     }
 
+    public int getConcurrentUploadsThreads() {
+        return concurrentUploadsThreads;
+    }
+
+    public void setConcurrentUploadsThreads(int concurrentUploadsThreads) {
+        this.concurrentUploadsThreads = concurrentUploadsThreads;
+    }
+
+    public int getAsyncUploadLimit() {
+        return asyncUploadLimit;
+    }
+
+    public void setAsyncUploadLimit(int asyncUploadLimit) {
+        this.asyncUploadLimit = asyncUploadLimit;
+    }
+
+    public boolean isContinueOnAsyncUploadFailure() {
+        return continueOnAsyncUploadFailure;
+    }
+
+    public void setContinueOnAsyncUploadFailure(
+            boolean continueOnAsyncUploadFailure) {
+        this.continueOnAsyncUploadFailure = continueOnAsyncUploadFailure;
+    }
+
+    public Backend getBackend() {
+        return backend;
+    }
+
+    /**
+     * This class initiates files upload in multiple threads to backend.
+     */
+    private class FilesUploader {
+        final List<File> files;
+
+        final long totalSize;
+
+        volatile AtomicInteger currentCount = new AtomicInteger();
+
+        volatile AtomicLong currentSize = new AtomicLong();
+
+        volatile AtomicBoolean exceptionRaised = new AtomicBoolean();
+
+        DataStoreException exception;
+
+        final int threads;
+
+        final boolean updateAsyncCache;
+
+        FilesUploader(List<File> files, long totalSize, int threads,
+                boolean updateAsyncCache) {
+            super();
+            this.files = files;
+            this.threads = threads;
+            this.totalSize = totalSize;
+            this.updateAsyncCache = updateAsyncCache;
+        }
+
+        void addCurrentCount(int delta) {
+            currentCount.addAndGet(delta);
+        }
+
+        void addCurrentSize(long delta) {
+            currentSize.addAndGet(delta);
+        }
+
+        synchronized void setException(DataStoreException exception) {
+            exceptionRaised.getAndSet(true);
+            this.exception = exception;
+        }
+
+        boolean isExceptionRaised() {
+            return exceptionRaised.get();
+        }
+
+        void logProgress() {
+            LOG.info("Uploaded:  {" + currentCount.get() + "}/{" + files.size()
+                + "} files, {" + currentSize.get() + "}/{" + totalSize
+                + "} size data");
+        }
+
+        void upload() throws DataStoreException {
+            long startTime = System.currentTimeMillis();
+            LOG.info(" Uploading " + files.size() + " using " + threads
+                + " threads.");
+            ExecutorService executor = Executors.newFixedThreadPool(threads,
+                new NamedThreadFactory("backend-file-upload-worker"));
+            int partitionSize = files.size() / (threads);
+            int startIndex = 0;
+            int endIndex = partitionSize;
+            for (int i = 1; i <= threads; i++) {
+                List<File> partitionFileList = Collections.unmodifiableList(files.subList(
+                    startIndex, endIndex));
+                FileUploaderThread fut = new FileUploaderThread(
+                    partitionFileList, startIndex, endIndex, this,
+                    updateAsyncCache);
+                executor.execute(fut);
+
+                startIndex = endIndex;
+                if (i == (threads - 1)) {
+                    endIndex = files.size();
+                } else {
+                    endIndex = startIndex + partitionSize;
+                }
+            }
+            // This will make the executor accept no new threads
+            // and finish all existing threads in the queue
+            executor.shutdown();
+
+            try {
+                // Wait until all threads are finish
+                while (!isExceptionRaised()
+                    && !executor.awaitTermination(15, TimeUnit.SECONDS)) {
+                    logProgress();
+                }
+            } catch (InterruptedException ie) {
+
+            }
+            long endTime = System.currentTimeMillis();
+            LOG.info("Uploaded:  {" + currentCount.get() + "}/{" + files.size()
+                + "} files, {" + currentSize.get() + "}/{" + totalSize
+                + "} size data, time taken {" + ((endTime - startTime) / 1000)
+                + "} sec");
+            if (isExceptionRaised()) {
+                executor.shutdownNow(); // Cancel currently executing tasks
+                throw exception;
+            }
+        }
+
+    }
+
+    /**
+     * This class implements {@link Runnable} interface and uploads list of
+     * files from startIndex to endIndex to {@link Backend}
+     */
+    private class FileUploaderThread implements Runnable {
+        final List<File> files;
+
+        final FilesUploader filesUploader;
+
+        final int startIndex;
+
+        final int endIndex;
+
+        final boolean updateAsyncCache;
+
+        FileUploaderThread(List<File> files, int startIndex, int endIndex,
+                FilesUploader controller, boolean updateAsyncCache) {
+            super();
+            this.files = files;
+            this.filesUploader = controller;
+            this.startIndex = startIndex;
+            this.endIndex = endIndex;
+            this.updateAsyncCache = updateAsyncCache;
+        }
+
+        public void run() {
+            long time = System.currentTimeMillis();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Thread [ " + Thread.currentThread().getName()
+                    + "]: Uploading files from startIndex[" + startIndex
+                    + "] and endIndex [" + (endIndex - 1)
+                    + "], both inclusive.");
+            }
+            int uploadCount = 0;
+            long uploadSize = 0;
+            try {
+                for (File f : files) {
+
+                    if (filesUploader.isExceptionRaised()) {
+                        break;
+                    }
+                    String name = f.getName();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("upload file = " + name);
+                    }
+                    if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
+                        && f.length() > 0) {
+                        uploadFileToBackEnd(f, updateAsyncCache);
+                    }
+                    uploadCount++;
+                    uploadSize += f.length();
+                    // update upload status at every 15 seconds.
+                    long now = System.currentTimeMillis();
+                    if (now > time + 15000) {
+                        filesUploader.addCurrentCount(uploadCount);
+                        filesUploader.addCurrentSize(uploadSize);
+                        uploadCount = 0;
+                        uploadSize = 0;
+                        time = now;
+                    }
+                }
+                // update final state.
+                filesUploader.addCurrentCount(uploadCount);
+                filesUploader.addCurrentSize(uploadSize);
+            } catch (DataStoreException e) {
+                if (!filesUploader.isExceptionRaised()) {
+                    filesUploader.setException(e);
+                }
+            }
+
+        }
+    }
+
 }

Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java?rev=1577127&r1=1577126&r2=1577127&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java Thu Mar 13 12:12:33 2014
@@ -84,6 +84,8 @@ public class LocalCache {
      * no-op.
      */
     private volatile boolean purgeMode;
+    
+    private AsyncUploadCache asyncUploadCache;
 
     /**
      * Build LRU cache of files located at 'path'. It uses lastModified property
@@ -98,66 +100,22 @@ public class LocalCache {
      * cache will go in auto-purge mode.
      * @param cachePurgeResizeFactor after cache purge size of cache will be
      * just less (cachePurgeResizeFactor * maxSize).
+     * @param asyncUploadCache {@link AsyncUploadCache}
      * @throws RepositoryException
      */
-    public LocalCache(final String path, final String tmpPath,
-            final long maxSize, final double cachePurgeTrigFactor,
-            final double cachePurgeResizeFactor) throws RepositoryException {
-        this.maxSize = maxSize;
+    public LocalCache(String path, String tmpPath, long size, double cachePurgeTrigFactor,
+            double cachePurgeResizeFactor, AsyncUploadCache asyncUploadCache) throws IOException,
+            ClassNotFoundException {
+        this.maxSize = size;
         directory = new File(path);
         tmp = new File(tmpPath);
-        cache = new LRUCache(maxSize, cachePurgeTrigFactor,
-            cachePurgeResizeFactor);
-        ArrayList<File> allFiles = new ArrayList<File>();
-
-        Iterator<File> it = FileUtils.iterateFiles(directory, null, true);
-        while (it.hasNext()) {
-            File f = it.next();
-            allFiles.add(f);
-        }
-        Collections.sort(allFiles, new Comparator<File>() {
-            @Override
-            public int compare(final File o1, final File o2) {
-                long l1 = o1.lastModified(), l2 = o2.lastModified();
-                return l1 < l2 ? -1 : l1 > l2 ? 1 : 0;
-            }
-        });
-        String dataStorePath = directory.getAbsolutePath();
-        long time = System.currentTimeMillis();
-        int count = 0;
-        int deletecount = 0;
-        for (File f : allFiles) {
-            if (f.exists()) {
-                long length = f.length();
-                String name = f.getPath();
-                if (name.startsWith(dataStorePath)) {
-                    name = name.substring(dataStorePath.length());
-                }
-                // convert to java path format
-                name = name.replace("\\", "/");
-                if (name.startsWith("/") || name.startsWith("\\")) {
-                    name = name.substring(1);
-                }
-                if ((cache.currentSizeInBytes + length) < cache.maxSizeInBytes) {
-                    count++;
-                    cache.put(name, length);
-                } else {
-                    if (tryDelete(name)) {
-                        deletecount++;
-                    }
-                }
-                long now = System.currentTimeMillis();
-                if (now > time + 5000) {
-                    LOG.info("Processed {" + (count + deletecount) + "}/{"
-                        + allFiles.size() + "}");
-                    time = now;
-                }
-            }
-        }
-        LOG.info("Cached {" + count + "}/{" + allFiles.size()
-            + "} , currentSizeInBytes = " + cache.currentSizeInBytes);
-        LOG.info("Deleted {" + deletecount + "}/{" + allFiles.size()
-            + "} files .");
+        LOG.info("cachePurgeTrigFactor = " + cachePurgeTrigFactor + ", cachePurgeResizeFactor = " + cachePurgeResizeFactor
+            + ", cachePurgeTrigFactorSize = " + (cachePurgeTrigFactor * size) + ", cachePurgeResizeFactor = "
+            + (cachePurgeResizeFactor * size));
+        cache = new LRUCache(size, cachePurgeTrigFactor, cachePurgeResizeFactor);
+        this.asyncUploadCache = asyncUploadCache;
+
+        new Thread(new CacheBuildJob()).start();
     }
 
     /**
@@ -168,51 +126,50 @@ public class LocalCache {
      * doesn't close the incoming inputstream.
      * 
      * @param fileName the key of cache.
-     * @param in the inputstream.
+     * @param in {@link InputStream}
      * @return the (new) input stream.
      */
-    public synchronized InputStream store(String fileName, final InputStream in)
+    public InputStream store(String fileName, final InputStream in)
             throws IOException {
         fileName = fileName.replace("\\", "/");
         File f = getFile(fileName);
         long length = 0;
-        if (!f.exists() || isInPurgeMode()) {
-            OutputStream out = null;
-            File transFile = null;
-            try {
-                TransientFileFactory tff = TransientFileFactory.getInstance();
-                transFile = tff.createTransientFile("s3-", "tmp", tmp);
-                out = new BufferedOutputStream(new FileOutputStream(transFile));
-                length = IOUtils.copyLarge(in, out);
-            } finally {
-                IOUtils.closeQuietly(out);
-            }
-            // rename the file to local fs cache
-            if (canAdmitFile(length)
-                && (f.getParentFile().exists() || f.getParentFile().mkdirs())
-                && transFile.renameTo(f) && f.exists()) {
-                if (transFile.exists() && transFile.delete()) {
-                    LOG.warn("tmp file = " + transFile.getAbsolutePath()
-                        + " not deleted successfully");
-                }
-                transFile = null;
-                toBeDeleted.remove(fileName);
-                if (cache.get(fileName) == null) {
+        synchronized (this) {
+            if (!f.exists() || isInPurgeMode()) {
+                OutputStream out = null;
+                File transFile = null;
+                try {
+                    TransientFileFactory tff = TransientFileFactory.getInstance();
+                    transFile = tff.createTransientFile("s3-", "tmp", tmp);
+                    out = new BufferedOutputStream(new FileOutputStream(transFile));
+                    length = IOUtils.copyLarge(in, out);
+                } finally {
+                    IOUtils.closeQuietly(out);
+                }
+                // rename the file to local fs cache
+                if (canAdmitFile(length)
+                    && (f.getParentFile().exists() || f.getParentFile().mkdirs())
+                    && transFile.renameTo(f) && f.exists()) {
+                    if (transFile.exists() && transFile.delete()) {
+                        LOG.info("tmp file = " + transFile.getAbsolutePath()
+                            + " not deleted successfully");
+                    }
+                    transFile = null;
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("file [" + fileName + "] added to local cache.");
+                    }
                     cache.put(fileName, f.length());
+                } else {
+                    f = transFile;
                 }
             } else {
-                f = transFile;
-            }
-        } else {
-            // f.exists and not in purge mode
-            f.setLastModified(System.currentTimeMillis());
-            toBeDeleted.remove(fileName);
-            if (cache.get(fileName) == null) {
+                // f.exists and not in purge mode
+                f.setLastModified(System.currentTimeMillis());
                 cache.put(fileName, f.length());
             }
+            cache.tryPurge();
+            return new LazyFileInputStream(f);
         }
-        cache.tryPurge();
-        return new LazyFileInputStream(f);
     }
 
     /**
@@ -224,29 +181,59 @@ public class LocalCache {
      * @param src file to be added to cache.
      * @throws IOException
      */
-    public synchronized void store(String fileName, final File src)
-            throws IOException {
+    public synchronized File store(String fileName, final File src) {
+        try {
+            return store(fileName, src, false).getFile();
+        } catch (IOException ioe) {
+            LOG.warn("Exception in addding file [" + fileName + "] to local cache.", ioe);
+        }
+        return null;
+    }
+
+    /**
+     * This method add file to {@link LocalCache} and tries that file can be
+     * added to {@link AsyncUploadCache}. If file is added to
+     * {@link AsyncUploadCache} successfully, it sets
+     * {@link AsyncUploadResult#setAsyncUpload(boolean)} to true.
+     *
+     * @param fileName name of the file.
+     * @param src source file.
+     * @param tryForAsyncUpload If true it tries to add fileName to
+     *            {@link AsyncUploadCache}
+     * @return {@link AsyncUploadCacheResult}. This method sets
+     *         {@link AsyncUploadResult#setAsyncUpload(boolean)} to true, if
+     *         fileName is added to {@link AsyncUploadCache} successfully else
+     *         it sets {@link AsyncUploadCacheResult#setAsyncUpload(boolean)} to
+     *         false. {@link AsyncUploadCacheResult#getFile()} contains cached
+     *         file, if it is added to {@link LocalCache} or original file.
+     * @throws IOException
+     */
+    public synchronized AsyncUploadCacheResult store(String fileName, File src, boolean tryForAsyncUpload) throws IOException {
         fileName = fileName.replace("\\", "/");
         File dest = getFile(fileName);
         File parent = dest.getParentFile();
-        if (src.exists() && !dest.exists() && !src.equals(dest)
-            && canAdmitFile(src.length())
-            && (parent.exists() || parent.mkdirs()) && (src.renameTo(dest))) {
-            toBeDeleted.remove(fileName);
-            if (cache.get(fileName) == null) {
-                cache.put(fileName, dest.length());
-            }
-
-        } else if (dest.exists()) {
-            dest.setLastModified(System.currentTimeMillis());
-            toBeDeleted.remove(fileName);
-            if (cache.get(fileName) == null) {
-                cache.put(fileName, dest.length());
+        AsyncUploadCacheResult result = new AsyncUploadCacheResult();
+        result.setFile(src);
+        result.setAsyncUpload(false);
+        boolean destExists = false;
+        if ((destExists = dest.exists())
+            || (src.exists() && !dest.exists() && !src.equals(dest) && canAdmitFile(src.length())
+                && (parent.exists() || parent.mkdirs()) && (src.renameTo(dest)))) {
+            if (destExists) {
+                dest.setLastModified(System.currentTimeMillis());
+            }
+            cache.put(fileName, dest.length());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("file [" + fileName + "] added to local cache.");
+            }
+            result.setFile(dest);
+            if (tryForAsyncUpload) {
+                result.setAsyncUpload(asyncUploadCache.add(fileName).canAsyncUpload());
             }
         }
         cache.tryPurge();
+        return result;
     }
-
     /**
      * Return the inputstream from from cache, or null if not in the cache.
      * 
@@ -254,16 +241,23 @@ public class LocalCache {
      * @return  stream or null.
      */
     public InputStream getIfStored(String fileName) throws IOException {
+        File file = getFileIfStored(fileName);
+        return file == null ? null : new LazyFileInputStream(file);
+    }
 
+    public synchronized File getFileIfStored(String fileName) throws IOException {
         fileName = fileName.replace("\\", "/");
         File f = getFile(fileName);
-        synchronized (this) {
-            if (!f.exists() || isInPurgeMode()) {
-                log("purgeMode true or file doesn't exists: getIfStored returned");
-                return null;
-            }
+        // return file in purge mode = true and file present in asyncUploadCache
+        // as asyncUploadCache's files will be not be deleted in cache purge.
+        if (!f.exists() || (isInPurgeMode() && !asyncUploadCache.hasEntry(fileName, false))) {
+            log("purgeMode true or file doesn't exists: getFileIfStored returned");
+            return null;
+        } else {
+            // touch entry in LRU caches
+            cache.put(fileName, f.length());
             f.setLastModified(System.currentTimeMillis());
-            return new LazyFileInputStream(f);
+            return f;
         }
     }
 
@@ -286,17 +280,20 @@ public class LocalCache {
      * Returns length of file if exists in cache else returns null.
      * @param fileName name of the file.
      */
-    public Long getFileLength(String fileName) {
-        fileName = fileName.replace("\\", "/");
-        File f = getFile(fileName);
-        synchronized (this) {
-            if (!f.exists() || isInPurgeMode()) {
-                log("purgeMode true or file doesn't exists: getFileLength returned");
-                return null;
+    public synchronized Long getFileLength(String fileName) {
+        Long length = null;
+        try {
+            length = cache.get(fileName);
+            if( length == null ) {
+                File f = getFileIfStored(fileName);
+                if (f != null) {
+                    length = f.length();
+                }
             }
-            f.setLastModified(System.currentTimeMillis());
-            return f.length();
+        } catch (IOException ignore) {
+
         }
+        return length;
     }
 
     /**
@@ -315,11 +312,10 @@ public class LocalCache {
      * @return true if yes else return false.
      */
     private synchronized boolean canAdmitFile(final long length) {
-        // order is important here
-        boolean value = !isInPurgeMode() && cache.canAdmitFile(length);
+      //order is important here
+        boolean value = !isInPurgeMode() && (cache.canAdmitFile(length));
         if (!value) {
-            log("cannot admit file of length=" + length
-                + " and currentSizeInBytes=" + cache.currentSizeInBytes);
+            log("cannot admit file of length=" + length + " and currentSizeInBytes=" + cache.currentSizeInBytes);
         }
         return value;
     }
@@ -410,11 +406,11 @@ public class LocalCache {
 
         final long maxSizeInBytes;
 
-        long cachePurgeResize;
+        final long cachePurgeResize;
         
-        private long cachePurgeTrigSize;
+        final long cachePurgeTrigSize;
 
-        public LRUCache(final long maxSizeInBytes,
+        LRUCache(final long maxSizeInBytes,
                 final double cachePurgeTrigFactor,
                 final double cachePurgeResizeFactor) {
             super(maxSizeElements(maxSizeInBytes), (float) 0.75, true);
@@ -433,20 +429,32 @@ public class LocalCache {
         public synchronized Long remove(final Object key) {
             String fileName = (String) key;
             fileName = fileName.replace("\\", "/");
+            try {
+                // not removing file from local cache, if there is in progress
+                // async upload on it.
+                if (asyncUploadCache.hasEntry(fileName, false)) {
+                    LOG.info("AsyncUploadCache upload contains file [" + fileName
+                        + "]. Not removing it from LocalCache.");
+                    return null;
+                }
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("error: ", e);
+                }
+                return null;
+            }
             Long flength = null;
             if (tryDelete(fileName)) {
                 flength = super.remove(key);
                 if (flength != null) {
-                    log("cache entry { " + fileName + "} with size {" + flength
-                        + "} removed.");
+                    log("cache entry { " + fileName + "} with size {" + flength + "} removed.");
                     currentSizeInBytes -= flength.longValue();
                 }
             } else if (!getFile(fileName).exists()) {
                 // second attempt. remove from cache if file doesn't exists
                 flength = super.remove(key);
                 if (flength != null) {
-                    log(" file not exists. cache entry { " + fileName
-                        + "} with size {" + flength + "} removed.");
+                    log(" file not exists. cache entry { " + fileName + "} with size {" + flength + "} removed.");
                     currentSizeInBytes -= flength.longValue();
                 }
             }
@@ -454,10 +462,15 @@ public class LocalCache {
         }
 
         @Override
-        public synchronized Long put(final String key, final Long value) {
-            long flength = value.longValue();
-            currentSizeInBytes += flength;
-            return super.put(key.replace("\\", "/"), value);
+        public synchronized Long put(final String fileName, final Long value) {
+            Long oldValue = cache.get(fileName);
+            if (oldValue == null) {
+                long flength = value.longValue();
+                currentSizeInBytes += flength;
+                return super.put(fileName.replace("\\", "/"), value);
+            }
+           toBeDeleted.remove(fileName);
+           return oldValue;
         }
 
         /**
@@ -468,10 +481,14 @@ public class LocalCache {
         synchronized void tryPurge() {
             if (currentSizeInBytes > cachePurgeTrigSize && !isInPurgeMode()) {
                 setPurgeMode(true);
-                LOG.info("currentSizeInBytes[" + cache.currentSizeInBytes
-                    + "] exceeds (cachePurgeTrigSize)["
-                    + cache.cachePurgeTrigSize + "]");
+                LOG.info("currentSizeInBytes[" + cache.currentSizeInBytes + "] exceeds (cachePurgeTrigSize)[" + cache.cachePurgeTrigSize
+                    + "]");
                 new Thread(new PurgeJob()).start();
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("currentSizeInBytes[" + cache.currentSizeInBytes + "] and  (cachePurgeTrigSize)[" + cache.cachePurgeTrigSize
+                        + "], isInPurgeMode =[" + isInPurgeMode() + "]");
+                }
             }
         }
         /**
@@ -532,4 +549,64 @@ public class LocalCache {
             }
         }
     }
+    
+    /**
+     * This class implements {@link Runnable} interface to build LRU cache
+     * asynchronously.
+     */
+    private class CacheBuildJob implements Runnable {
+        public void run() {
+            long startTime = System.currentTimeMillis();
+            ArrayList<File> allFiles = new ArrayList<File>();
+            Iterator<File> it = FileUtils.iterateFiles(directory, null, true);
+            while (it.hasNext()) {
+                File f = it.next();
+                allFiles.add(f);
+            }
+            long t1 = System.currentTimeMillis();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Time taken to recursive [" + allFiles.size() + "] took [" + ((t1 - startTime) / 1000) + "]sec");
+            }
+            Collections.sort(allFiles, new Comparator<File>() {
+                public int compare(File o1, File o2) {
+                    long l1 = o1.lastModified(), l2 = o2.lastModified();
+                    return l1 < l2 ? -1 : l1 > l2 ? 1 : 0;
+                }
+            });
+            long t2 = System.currentTimeMillis();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Time taken to sort [" + allFiles.size() + "] took [" + ((t2 - t1) / 1000) + "]sec");
+            }
+            String dataStorePath = directory.getAbsolutePath();
+            long time = System.currentTimeMillis();
+            int count = 0;
+            for (File f : allFiles) {
+                if (f.exists()) {
+                    count++;
+                    String name = f.getPath();
+                    if (name.startsWith(dataStorePath)) {
+                        name = name.substring(dataStorePath.length());
+                    }
+                    // convert to java path format
+                    name = name.replace("\\", "/");
+                    if (name.startsWith("/") || name.startsWith("\\")) {
+                        name = name.substring(1);
+                    }
+                    store(name, f);
+                    long now = System.currentTimeMillis();
+                    if (now > time + 10000) {
+                        LOG.info("Processed {" + (count) + "}/{" + allFiles.size() + "}");
+                        time = now;
+                    }
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Processed {" + count + "}/{" + allFiles.size() + "} , currentSizeInBytes = " + cache.currentSizeInBytes
+                    + ",  maxSizeInBytes = " + cache.maxSizeInBytes + ",  cache.filecount = " + cache.size());
+            }
+            long t3 = System.currentTimeMillis();
+            LOG.info("Time to build cache of  [" + allFiles.size() + "] took [" + ((t3 - startTime) / 1000) + "]sec");
+        }
+    }
 }
+

Added: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java?rev=1577127&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java (added)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java Thu Mar 13 12:12:33 2014
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.core.data.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class extends {@link ThreadFactory} to creates named threads.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+
+    private AtomicInteger threadCount = new AtomicInteger(1);
+
+    String threadPrefixName;
+
+    public NamedThreadFactory(String threadPrefixName) {
+        super();
+        this.threadPrefixName = threadPrefixName;
+    }
+
+    public Thread newThread(Runnable r) {
+        Thread thread = new Thread(r);
+        thread.setContextClassLoader(getClass().getClassLoader());
+        thread.setName(threadPrefixName + "-" + threadCount.getAndIncrement());
+        return thread;
+    }
+
+}

Propchange: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java?rev=1577127&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java (added)
+++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java Thu Mar 13 12:12:33 2014
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.core.data;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.jackrabbit.core.data.AsyncUploadCallback;
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ * An in-memory backend implementation used to speed up testing.
+ */
+public class InMemoryBackend implements Backend {
+
+    private HashMap<DataIdentifier, byte[]> data = new HashMap<DataIdentifier, byte[]>();
+
+    private HashMap<DataIdentifier, Long> timeMap = new HashMap<DataIdentifier, Long>();
+
+    @Override
+    public void init(CachingDataStore store, String homeDir, String config)
+            throws DataStoreException {
+        // ignore
+        log("init");
+    }
+
+    @Override
+    public void close() {
+        // ignore
+        log("close");
+    }
+
+    @Override
+    public boolean exists(final DataIdentifier identifier) {
+        log("exists " + identifier);
+        return data.containsKey(identifier);
+    }
+
+    @Override
+    public Iterator<DataIdentifier> getAllIdentifiers()
+            throws DataStoreException {
+        log("getAllIdentifiers");
+        return data.keySet().iterator();
+    }
+
+    @Override
+    public InputStream read(final DataIdentifier identifier)
+            throws DataStoreException {
+        log("read " + identifier);
+        return new ByteArrayInputStream(data.get(identifier));
+    }
+
+    @Override
+    public void writeAsync(final DataIdentifier identifier, final File file,
+            final AsyncUploadCallback callback) throws DataStoreException {
+        this.write(identifier, file, true, callback);
+    }
+
+    @Override
+    public void write(final DataIdentifier identifier, final File file)
+            throws DataStoreException {
+        this.write(identifier, file, false, null);
+    }
+
+    @Override
+    public long getLastModified(final DataIdentifier identifier)
+            throws DataStoreException {
+        log("getLastModified " + identifier);
+        return timeMap.get(identifier);
+    }
+
+    @Override
+    public void deleteRecord(final DataIdentifier identifier)
+            throws DataStoreException {
+        timeMap.remove(identifier);
+        data.remove(identifier);
+    }
+
+    @Override
+    public Set<DataIdentifier> deleteAllOlderThan(final long min) {
+        log("deleteAllOlderThan " + min);
+        Set<DataIdentifier> tobeDeleted = new HashSet<DataIdentifier>();
+        for (Map.Entry<DataIdentifier, Long> entry : timeMap.entrySet()) {
+            DataIdentifier identifier = entry.getKey();
+            long timestamp = entry.getValue();
+            if (timestamp < min) {
+                tobeDeleted.add(identifier);
+            }
+        }
+        for (DataIdentifier identifier : tobeDeleted) {
+            timeMap.remove(identifier);
+            data.remove(identifier);
+        }
+        return tobeDeleted;
+    }
+
+    @Override
+    public long getLength(final DataIdentifier identifier)
+            throws DataStoreException {
+        try {
+            return data.get(identifier).length;
+        } catch (Exception e) {
+            throw new DataStoreException(e);
+        }
+    }
+
+    @Override
+    public boolean exists(final DataIdentifier identifier, final boolean touch)
+            throws DataStoreException {
+        boolean retVal = data.containsKey(identifier);
+        if (retVal && touch) {
+            timeMap.put(identifier, System.currentTimeMillis());
+        }
+        return retVal;
+    }
+
+    private void write(final DataIdentifier identifier, final File file,
+            final boolean async, final AsyncUploadCallback callback)
+            throws DataStoreException {
+        log("write " + identifier + " " + file.length());
+        byte[] buffer = new byte[(int) file.length()];
+        try {
+            if (async && callback == null) {
+                throw new IllegalArgumentException(
+                    "callback parameter cannot be null");
+            }
+            DataInputStream din = new DataInputStream(new FileInputStream(file));
+            din.readFully(buffer);
+            din.close();
+            data.put(identifier, buffer);
+            timeMap.put(identifier, System.currentTimeMillis());
+        } catch (IOException e) {
+            if (async) {
+                callback.call(identifier, file,
+                    AsyncUploadCallback.RESULT.ABORTED);
+            }
+            throw new DataStoreException(e);
+        }
+        if (async) {
+            callback.call(identifier, file, AsyncUploadCallback.RESULT.SUCCESS);
+        }
+    }
+
+    /**
+     * Log a message if logging is enabled.
+     * 
+     * @param message
+     *            the message
+     */
+    private void log(final String message) {
+        // System.out.println(message);
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java?rev=1577127&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java (added)
+++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java Thu Mar 13 12:12:33 2014
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.data;
+
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.CachingDataStore;
+
+/**
+ * A caching data store that uses the in-memory backend.
+ */
+public class InMemoryDataStore extends CachingDataStore {
+
+    @Override
+    protected Backend createBackend() {
+        return new InMemoryBackend();
+    }
+
+    @Override
+    protected String getMarkerFile() {
+        return "mem.init.done";
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java?rev=1577127&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java (added)
+++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java Thu Mar 13 12:12:33 2014
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.data;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An input stream that returns pseudo-random bytes.
+ */
+public class RandomInputStream extends InputStream {
+
+    private static final long MUL = 0x5DEECE66DL;
+    private static final long ADD = 0xBL;
+    private static final long MASK = (1L << 48) - 1;
+    private static final int DEFAULT_MAX_READ_BLOCK_SIZE = 15;
+
+    private final long initialSeed;
+    private final long len;
+    private long markedState;
+    private long pos;
+    private long markedPos;
+    private long state;
+    private int maxReadBlockSize;
+
+    public String toString() {
+        return "new RandomInputStream(" + initialSeed + ", " + len + ")";
+    }
+
+    public RandomInputStream(long seed, long len) {
+        this(seed, len, DEFAULT_MAX_READ_BLOCK_SIZE);
+    }
+
+    public static void compareStreams(InputStream a, InputStream b) throws IOException {
+        a = new BufferedInputStream(a);
+        b = new BufferedInputStream(b);
+        long pos = 0;
+        while (true) {
+            int x = a.read();
+            int y = b.read();
+            if (x == -1 || y == -1) {
+                if (x == y) {
+                    break;
+                }
+            }
+            if (x != y) {
+                throw new IOException("Incorrect byte at position " + pos + ": x=" + x + " y=" + y);
+            }
+        }
+    }
+
+    public RandomInputStream(long seed, long len, int maxReadBlockSize) {
+        this.initialSeed = seed;
+        this.len = len;
+        this.maxReadBlockSize = maxReadBlockSize;
+        setSeed(seed);
+        reset();
+    }
+
+    public long skip(long n) {
+        n = getReadBlock(n);
+        if (n == 0) {
+            return -1;
+        }
+        pos += n;
+        return n;
+    }
+
+    private int getReadBlock(long n) {
+        if (n > (len - pos)) {
+            n = (len - pos);
+        }
+        if (n > maxReadBlockSize) {
+            n = maxReadBlockSize;
+        } else if (n < 0) {
+            n = 0;
+        }
+        return (int) n;
+    }
+
+    public int read(byte[] b, int off, int len) {
+        if (pos >= this.len) {
+            return -1;
+        }
+        len = getReadBlock(len);
+        if (len == 0) {
+            return -1;
+        }
+        for (int i = 0; i < len; i++) {
+            b[off + i] = (byte) (next() & 255);
+        }
+        pos += len;
+        return len;
+    }
+
+    public int read(byte[] b) {
+        return read(b, 0, b.length);
+    }
+
+    public void close() {
+        pos = len;
+    }
+
+    private void setSeed(long seed) {
+        markedState = (seed ^ MUL) & MASK;
+    }
+
+    private int next() {
+        state = (state * MUL + ADD) & MASK;
+        return (int) (state >>> (48 - 32));
+    }
+
+    public void reset() {
+        pos = markedPos;
+        state = markedState;
+    }
+
+    public int read() {
+        if (pos >= len) {
+            return -1;
+        }
+        pos++;
+        return next() & 255;
+    }
+
+    public boolean markSupported() {
+        return true;
+    }
+
+    public void mark(int readlimit) {
+        markedPos = pos;
+        markedState = state;
+    }
+
+}

Propchange: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message