Return-Path: X-Original-To: apmail-jackrabbit-commits-archive@www.apache.org Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D6FBC10513 for ; Thu, 13 Mar 2014 12:13:12 +0000 (UTC) Received: (qmail 26348 invoked by uid 500); 13 Mar 2014 12:13:12 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 26305 invoked by uid 500); 13 Mar 2014 12:13:10 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 26070 invoked by uid 99); 13 Mar 2014 12:12:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Mar 2014 12:12:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Mar 2014 12:12:56 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E828D23889D5; Thu, 13 Mar 2014 12:12:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1577127 [2/3] - in /jackrabbit/trunk: jackrabbit-aws-ext/ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/ jackrabbit-aws-ext/src/test/java/org/apache/jackra... Date: Thu, 13 Mar 2014 12:12:34 -0000 To: commits@jackrabbit.apache.org From: dpfister@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140313121235.E828D23889D5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java?rev=1577127&r1=1577126&r2=1577127&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java (original) +++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java Thu Mar 13 12:12:33 2014 @@ -20,9 +20,7 @@ package org.apache.jackrabbit.core.data; import java.io.File; import java.io.InputStream; import java.util.Iterator; -import java.util.List; - - +import java.util.Set; /** * The interface defines the backend which can be plugged into @@ -33,9 +31,12 @@ public interface Backend { /** * This method initialize backend with the configuration. * - * @param store {@link CachingDataStore} - * @param homeDir path of repository home dir. - * @param config path of config property file. + * @param store + * {@link CachingDataStore} + * @param homeDir + * path of repository home dir. + * @param config + * path of config property file. * @throws DataStoreException */ void init(CachingDataStore store, String homeDir, String config) @@ -44,27 +45,33 @@ public interface Backend { /** * Return inputstream of record identified by identifier. * - * @param identifier identifier of record. + * @param identifier + * identifier of record. * @return inputstream of the record. - * @throws DataStoreException if record not found or any error. + * @throws DataStoreException + * if record not found or any error. */ InputStream read(DataIdentifier identifier) throws DataStoreException; /** * Return length of record identified by identifier. * - * @param identifier identifier of record. + * @param identifier + * identifier of record. * @return length of the record. - * @throws DataStoreException if record not found or any error. + * @throws DataStoreException + * if record not found or any error. */ long getLength(DataIdentifier identifier) throws DataStoreException; /** * Return lastModified of record identified by identifier. * - * @param identifier identifier of record. + * @param identifier + * identifier of record. * @return lastModified of the record. - * @throws DataStoreException if record not found or any error. + * @throws DataStoreException + * if record not found or any error. */ long getLastModified(DataIdentifier identifier) throws DataStoreException; @@ -72,30 +79,54 @@ public interface Backend { * Stores file to backend with identifier used as key. If key pre-exists, it * updates the timestamp of the key. * - * @param identifier key of the file - * @param file file that would be stored in backend. - * @throws DataStoreException for any error. + * @param identifier + * key of the file + * @param file + * file that would be stored in backend. + * @throws DataStoreException + * for any error. */ void write(DataIdentifier identifier, File file) throws DataStoreException; /** - * Returns identifiers of all records that exists in backend. + * Write file to backend in asynchronous mode. Backend implmentation may + * choose not to write asynchronously but it requires to call + * {@link AsyncUploadCallback#call(DataIdentifier, File, com.day.crx.cloud.s3.ds.AsyncUploadCallback.RESULT)} + * after upload succeed or failed. + * + * @param identifier + * @param file + * @param callback + * Callback interface to called after upload succeed or failed. + * @throws DataStoreException + */ + void writeAsync(DataIdentifier identifier, File file, + AsyncUploadCallback callback) throws DataStoreException; + + /** + * Returns identifiers of all records that exists in backend. + * * @return iterator consisting of all identifiers * @throws DataStoreException */ Iterator getAllIdentifiers() throws DataStoreException; /** - * Update timestamp of record identified by identifier if minModifiedDate is - * greater than record's lastModified else no op. + * This method check the existence of record in backend. Return true if + * records exists else false. This method also touch record identified by + * identifier if touch is true. * - * @throws DataStoreException if record not found. + * @param identifier + * @throws DataStoreException */ - void touch(DataIdentifier identifier, long minModifiedDate) + boolean exists(DataIdentifier identifier, boolean touch) throws DataStoreException; + /** - * This method check the existence of record in backend. - * @param identifier identifier to be checked. + * This method check the existence of record in backend. + * + * @param identifier + * identifier to be checked. * @return true if records exists else false. * @throws DataStoreException */ @@ -103,22 +134,27 @@ public interface Backend { /** * Close backend and release resources like database connection if any. + * * @throws DataStoreException */ void close() throws DataStoreException; /** * Delete all records which are older than timestamp. + * * @param timestamp - * @return list of identifiers which are deleted. + * @return {@link Set} of identifiers which are deleted. * @throws DataStoreException */ - List deleteAllOlderThan(long timestamp) throws DataStoreException; + Set deleteAllOlderThan(long timestamp) + throws DataStoreException; /** * Delete record identified by identifier. No-op if identifier not found. + * * @param identifier * @throws DataStoreException */ void deleteRecord(DataIdentifier identifier) throws DataStoreException; } + Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java?rev=1577127&r1=1577126&r2=1577127&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java (original) +++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java Thu Mar 13 12:12:33 2014 @@ -19,6 +19,8 @@ package org.apache.jackrabbit.core.data; import java.io.InputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * CachingDataRecord which stores reference to {@link CachingDataStore}. This @@ -27,6 +29,8 @@ import java.io.InputStream; */ public class CachingDataRecord extends AbstractDataRecord { + private static final Logger LOG = LoggerFactory.getLogger(CachingDataRecord.class); + private final CachingDataStore store; public CachingDataRecord(CachingDataStore store, DataIdentifier identifier) { @@ -39,6 +43,8 @@ public class CachingDataRecord extends A try { return store.getLastModified(getIdentifier()); } catch (DataStoreException dse) { + LOG.info("exception in getLastModified for identifier [" + + getIdentifier() + "]. returning 0.", dse); return 0; } } Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java?rev=1577127&r1=1577126&r2=1577127&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (original) +++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java Thu Mar 13 12:12:33 2014 @@ -29,15 +29,24 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import javax.jcr.RepositoryException; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.core.data.util.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,10 +70,13 @@ import org.slf4j.LoggerFactory; * <param name="{@link #setCachePurgeTrigFactor(double)}" value="0.95d"/> * <param name="{@link #setCachePurgeResizeFactor(double) cacheSize}" value="0.85d"/> * <param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/> + * <param name="{@link #setContinueOnAsyncUploadFailure(boolean) continueOnAsyncUploadFailure}" value="false"/> + * <param name="{@link #setConcurrentUploadsThreads(int) concurrentUploadsThreads}" value="10"/> + * <param name="{@link #setAsyncUploadLimit(int) asyncUploadLimit}" value="100"/> * </DataStore> */ public abstract class CachingDataStore extends AbstractDataStore implements - MultiDataStoreAware { + MultiDataStoreAware, AsyncUploadCallback { /** * Logger instance. @@ -88,9 +100,7 @@ public abstract class CachingDataStore e * All data identifiers that are currently in use are in this set until they * are garbage collected. */ - protected Map> inUse = - Collections.synchronizedMap(new WeakHashMap>()); + protected Map> inUse = Collections.synchronizedMap(new WeakHashMap>()); protected Backend backend; @@ -141,11 +151,47 @@ public abstract class CachingDataStore e */ private LocalCache cache; + /** + * Caching holding pending uploads + */ + private AsyncUploadCache asyncWriteCache; + protected abstract Backend createBackend(); protected abstract String getMarkerFile(); /** + * In {@link #init(String)},it resumes all incomplete asynchronous upload + * from {@link AsyncUploadCache} and uploads them concurrently in multiple + * threads. It throws {@link RepositoryException}, if file is not found in + * local cache for that asynchronous upload. As far as code is concerned, it + * is only possible when somebody has removed files from local cache + * manually. If there is an exception and user want to proceed with + * inconsistencies, set parameter continueOnAsyncUploadFailure to true in + * repository.xml. This will ignore {@link RepositoryException} and log all + * missing files and proceed after resetting {@link AsyncUploadCache} . + */ + private boolean continueOnAsyncUploadFailure; + + /** + * The {@link #init(String)} methods checks for {@link #getMarkerFile()} and + * if it doesn't exists migrates all files from fileystem to {@link Backend} + * . This parameter governs number of threads which will upload files + * concurrently to {@link Backend}. + */ + private int concurrentUploadsThreads = 10; + + /** + * This parameter limits the number of asynchronous uploads slots to + * {@link Backend}. Once this limit is reached, further uploads to + * {@link Backend} are synchronous, till one of asynchronous uploads + * completes and make asynchronous uploads slot available. To disable + * asynchronous upload, set {@link #asyncUploadLimit} parameter to 0 in + * repository.xml. By default it is 100 + */ + private int asyncUploadLimit = 100; + + /** * Initialized the data store. If the path is not set, <repository * home>/repository/datastore is used. This directory is automatically * created if it does not yet exist. During first initialization, it upload @@ -154,51 +200,92 @@ public abstract class CachingDataStore e */ @Override public void init(String homeDir) throws RepositoryException { - if (path == null) { - path = homeDir + "/repository/datastore"; - } - directory = new File(path); try { + if (path == null) { + path = homeDir + "/repository/datastore"; + tmpDir = new File(homeDir, "/repository/s3tmp"); + } else { + // cache is moved from 'path' to 'path'/repository/datastore + tmpDir = new File(path, "/repository/s3tmp"); + path = path + "/repository/datastore"; + } + LOG.info("path=[" + path + ",] tmpPath= [" + tmpDir.getPath() + "]"); + directory = new File(path); mkdirs(directory); - } catch (IOException e) { - throw new DataStoreException("Could not create directory " - + directory.getAbsolutePath(), e); - } - tmpDir = new File(homeDir, "/repository/s3tmp"); - try { if (!mkdirs(tmpDir)) { FileUtils.cleanDirectory(tmpDir); LOG.info("tmp = " + tmpDir.getPath() + " cleaned"); } - } catch (IOException e) { - throw new DataStoreException("Could not create directory " - + tmpDir.getAbsolutePath(), e); - } - LOG.info("cachePurgeTrigFactor = " + cachePurgeTrigFactor - + ", cachePurgeResizeFactor = " + cachePurgeResizeFactor); - backend = createBackend(); - backend.init(this, path, config); - String markerFileName = getMarkerFile(); - if (markerFileName != null) { - // create marker file in homeDir to avoid deletion in cache cleanup. - File markerFile = new File(homeDir, markerFileName); - if (!markerFile.exists()) { - LOG.info("load files from local cache"); - loadFilesFromCache(); - try { - markerFile.createNewFile(); - } catch (IOException e) { - throw new DataStoreException( + + asyncWriteCache = new AsyncUploadCache(); + asyncWriteCache.init(homeDir, path, asyncUploadLimit); + + backend = createBackend(); + backend.init(this, path, config); + String markerFileName = getMarkerFile(); + if (markerFileName != null) { + // create marker file in homeDir to avoid deletion in cache + // cleanup. + File markerFile = new File(homeDir, markerFileName); + if (!markerFile.exists()) { + LOG.info("load files from local cache"); + uploadFilesFromCache(); + try { + markerFile.createNewFile(); + } catch (IOException e) { + throw new DataStoreException( "Could not create marker file " - + markerFile.getAbsolutePath(), e); - } - } else { - LOG.info("marker file = " + markerFile.getAbsolutePath() + + markerFile.getAbsolutePath(), e); + } + } else { + LOG.info("marker file = " + markerFile.getAbsolutePath() + " exists"); + } } + // upload any leftover async uploads to backend during last shutdown + Set fileList = asyncWriteCache.getAll(); + if (fileList != null && !fileList.isEmpty()) { + List errorFiles = new ArrayList(); + LOG.info("Uploading [" + fileList + "] and size [" + + fileList.size() + "] from AsyncUploadCache."); + long totalSize = 0; + List files = new ArrayList(fileList.size()); + for (String fileName : fileList) { + File f = new File(path, fileName); + if (!f.exists()) { + errorFiles.add(fileName); + LOG.error("Cannot upload pending file [" + + f.getAbsolutePath() + "]. File doesn't exist."); + } else { + totalSize += f.length(); + files.add(new File(homeDir, fileName)); + } + } + new FilesUploader(files, totalSize, concurrentUploadsThreads, + true).upload(); + if (!continueOnAsyncUploadFailure && errorFiles.size() > 0) { + LOG.error("Pending uploads of files [" + errorFiles + + "] failed. Files do not exist in Local cache."); + LOG.error("To continue set [continueOnAsyncUploadFailure] to true in Datastore configuration in repository.xml." + + " There would be inconsistent data in repository due the missing files. "); + throw new RepositoryException( + "Cannot upload async uploads from local cache. Files not found."); + } else { + if (errorFiles.size() > 0) { + LOG.error("Pending uploads of files [" + + errorFiles + + "] failed. Files do not exist" + + " in Local cache. Continuing as [continueOnAsyncUploadFailure] is set to true."); + } + LOG.info("Reseting AsyncWrite Cache list."); + asyncWriteCache.reset(); + } + } + cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize, + cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache); + } catch (Exception e) { + throw new RepositoryException(e); } - cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize, - cachePurgeTrigFactor, cachePurgeResizeFactor); } /** @@ -218,6 +305,8 @@ public abstract class CachingDataStore e @Override public DataRecord addRecord(InputStream input) throws DataStoreException { File temporary = null; + long startTime = System.currentTimeMillis(); + long length = 0; try { temporary = newTemporaryFile(); DataIdentifier tempId = new DataIdentifier(temporary.getName()); @@ -226,23 +315,47 @@ public abstract class CachingDataStore e // stream length and the message digest of the stream MessageDigest digest = MessageDigest.getInstance(DIGEST); OutputStream output = new DigestOutputStream(new FileOutputStream( - temporary), digest); + temporary), digest); try { - IOUtils.copyLarge(input, output); + length = IOUtils.copyLarge(input, output); } finally { output.close(); } + long currTime = System.currentTimeMillis(); DataIdentifier identifier = new DataIdentifier( - encodeHexString(digest.digest())); + encodeHexString(digest.digest())); + if (LOG.isDebugEnabled()) { + LOG.debug("getting SHA1 hash [" + identifier + "] length [" + + length + "], in [" + (currTime - startTime) + "] ms."); + } + String fileName = getFileName(identifier); + AsyncUploadCacheResult result = null; synchronized (this) { usesIdentifier(identifier); - backend.write(identifier, temporary); - String fileName = getFileName(identifier); - cache.store(fileName, temporary); + // check if async upload is already in progress + if (!asyncWriteCache.hasEntry(fileName, true)) { + result = cache.store(fileName, temporary, true); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("storing [" + identifier + "] in localCache took [" + + (System.currentTimeMillis() - currTime) + "] ms."); + } + if (result != null) { + if (result.canAsyncUpload()) { + backend.writeAsync(identifier, result.getFile(), this); + } else { + backend.write(identifier, result.getFile()); + } } // this will also make sure that // tempId is not garbage collected until here inUse.remove(tempId); + if (LOG.isDebugEnabled()) { + LOG.debug("write [" + identifier + "] length [" + length + + "], in [" + (System.currentTimeMillis() - startTime) + + "] ms."); + } return new CachingDataRecord(this, identifier); } catch (NoSuchAlgorithmException e) { throw new DataStoreException(DIGEST + " not available", e); @@ -256,6 +369,35 @@ public abstract class CachingDataStore e } } + @Override + public DataRecord getRecord(DataIdentifier identifier) + throws DataStoreException { + String fileName = getFileName(identifier); + boolean touch = minModifiedDate > 0 ? true : false; + synchronized (this) { + try { + if (asyncWriteCache.hasEntry(fileName, touch)) { + usesIdentifier(identifier); + return new CachingDataRecord(this, identifier); + } else if (cache.getFileIfStored(fileName) != null) { + if (touch) { + backend.exists(identifier, touch); + } + usesIdentifier(identifier); + return new CachingDataRecord(this, identifier); + } else if (backend.exists(identifier, touch)) { + usesIdentifier(identifier); + return new CachingDataRecord(this, identifier); + } + + } catch (IOException ioe) { + throw new DataStoreException("error in getting record [" + + identifier + "]", ioe); + } + } + throw new DataStoreException("Record not found: " + identifier); + } + /** * Get a data record for the given identifier or null it data record doesn't * exist in {@link Backend} @@ -267,14 +409,20 @@ public abstract class CachingDataStore e @Override public DataRecord getRecordIfStored(DataIdentifier identifier) throws DataStoreException { + String fileName = getFileName(identifier); + boolean touch = minModifiedDate > 0 ? true : false; synchronized (this) { - usesIdentifier(identifier); - if (!backend.exists(identifier)) { - return null; + try { + if (asyncWriteCache.hasEntry(fileName, touch) + || backend.exists(identifier, touch)) { + usesIdentifier(identifier); + return new CachingDataRecord(this, identifier); + } + } catch (IOException ioe) { + throw new DataStoreException(ioe); } - backend.touch(identifier, minModifiedDate); - return new CachingDataRecord(this, identifier); } + return null; } @Override @@ -289,7 +437,15 @@ public abstract class CachingDataStore e @Override public Iterator getAllIdentifiers() throws DataStoreException { - return backend.getAllIdentifiers(); + Set ids = new HashSet(); + for (String fileName : asyncWriteCache.getAll()) { + ids.add(getIdentifier(fileName)); + } + Iterator itr = backend.getAllIdentifiers(); + while (itr.hasNext()) { + ids.add(itr.next()); + } + return ids.iterator(); } /** @@ -301,20 +457,35 @@ public abstract class CachingDataStore e throws DataStoreException { String fileName = getFileName(identifier); synchronized (this) { - backend.deleteRecord(identifier); - cache.delete(fileName); + try { + // order is important here + asyncWriteCache.delete(fileName); + backend.deleteRecord(identifier); + cache.delete(fileName); + } catch (IOException ioe) { + throw new DataStoreException(ioe); + } } } @Override public synchronized int deleteAllOlderThan(long min) throws DataStoreException { - List diList = backend.deleteAllOlderThan(min); + Set diSet = backend.deleteAllOlderThan(min); // remove entries from local cache - for (DataIdentifier identifier : diList) { + for (DataIdentifier identifier : diSet) { cache.delete(getFileName(identifier)); } - return diList.size(); + try { + for (String fileName : asyncWriteCache.deleteOlderThan(min)) { + diSet.add(getIdentifier(fileName)); + } + } catch (IOException e) { + throw new DataStoreException(e); + } + LOG.info("deleteAllOlderThan exit. Deleted [" + diSet + + "] records. Number of records deleted [" + diSet.size() + "]"); + return diSet.size(); } /** @@ -344,9 +515,23 @@ public abstract class CachingDataStore e * Return lastModified of record from {@link Backend} assuming * {@link Backend} as a single source of truth. */ - public long getLastModified(DataIdentifier identifier) throws DataStoreException { - LOG.info("accessed lastModified"); - return backend.getLastModified(identifier); + public long getLastModified(DataIdentifier identifier) + throws DataStoreException { + if (LOG.isDebugEnabled()) { + LOG.debug("accessed lastModified of identifier:" + identifier); + } + String fileName = getFileName(identifier); + long lastModified = asyncWriteCache.getLastModified(fileName); + if (lastModified != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("getlastModified of identifier [" + identifier + + "] from AsyncUploadCache = " + lastModified); + } + return lastModified; + + } else { + return backend.getLastModified(identifier); + } } /** @@ -358,6 +543,22 @@ public abstract class CachingDataStore e Long length = cache.getFileLength(fileName); if (length != null) { return length.longValue(); + } else { + InputStream in = null; + InputStream cachedStream = null; + try { + in = backend.read(identifier); + cachedStream = cache.store(fileName, in); + } catch (IOException e) { + throw new DataStoreException("IO Exception: " + identifier, e); + } finally { + IOUtils.closeQuietly(in); + IOUtils.closeQuietly(cachedStream); + } + length = cache.getFileLength(fileName); + if (length != null) { + return length.longValue(); + } } return backend.getLength(identifier); } @@ -371,6 +572,52 @@ public abstract class CachingDataStore e } } + public Set getPendingUploads() { + return asyncWriteCache.getAll(); + } + + public void call(DataIdentifier identifier, File file, + AsyncUploadCallback.RESULT resultCode) { + String fileName = getFileName(identifier); + if (AsyncUploadCallback.RESULT.SUCCESS.equals(resultCode)) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Upload completed. [" + identifier + "]."); + } + AsyncUploadCacheResult result = asyncWriteCache.remove(fileName); + if (result.doRequiresDelete()) { + // added record already marked for delete + deleteRecord(identifier); + } + } catch (IOException ie) { + LOG.warn("Cannot remove pending file upload. Dataidentifer [ " + + identifier + "], file [" + file.getAbsolutePath() + "]", + ie); + } catch (DataStoreException dse) { + LOG.warn("Cannot remove pending file upload. Dataidentifer [ " + + identifier + "], file [" + file.getAbsolutePath() + "]", + dse); + } + } else if (AsyncUploadCallback.RESULT.FAILED.equals(resultCode)) { + LOG.error("Async Upload failed. Dataidentifer [ " + identifier + + "], file [" + file.getAbsolutePath() + "]"); + } else if (AsyncUploadCallback.RESULT.ABORTED.equals(resultCode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Async Upload Aborted. Dataidentifer [ " + identifier + + "], file [" + file.getAbsolutePath() + "]"); + } + try { + asyncWriteCache.remove(fileName); + LOG.info("Async Upload Aborted. Dataidentifer [ " + identifier + + "], file [" + file.getAbsolutePath() + "] removed."); + } catch (IOException ie) { + LOG.warn("Cannot remove pending file upload. Dataidentifer [ " + + identifier + "], file [" + file.getAbsolutePath() + "]", + ie); + } + } + } + /** * Returns a unique temporary file to be used for creating a new data * record. @@ -382,36 +629,57 @@ public abstract class CachingDataStore e /** * Load files from {@link LocalCache} to {@link Backend}. */ - private void loadFilesFromCache() throws RepositoryException { + private void uploadFilesFromCache() throws RepositoryException { ArrayList files = new ArrayList(); listRecursive(files, directory); long totalSize = 0; for (File f : files) { totalSize += f.length(); } + if (concurrentUploadsThreads > 1) { + new FilesUploader(files, totalSize, concurrentUploadsThreads, false).upload(); + } else { + uploadFilesInSingleThread(files, totalSize); + } + } + + private void uploadFilesInSingleThread(List files, long totalSize) + throws RepositoryException { + long startTime = System.currentTimeMillis(); + LOG.info("Upload: {" + files.size() + "} files in single thread."); + long currentCount = 0; long currentSize = 0; long time = System.currentTimeMillis(); for (File f : files) { long now = System.currentTimeMillis(); if (now > time + 5000) { - LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}"); + LOG.info("Uploaded: {" + currentCount + "}/{" + files.size() + + "} files, {" + currentSize + "}/{" + totalSize + + "} size data"); time = now; } - currentSize += f.length(); String name = f.getName(); - LOG.debug("upload file = " + name); + if (LOG.isDebugEnabled()) { + LOG.debug("upload file = " + name); + } if (!name.startsWith(TMP) && !name.endsWith(DS_STORE) - && f.length() > 0) { - loadFileToBackEnd(f); + && f.length() > 0) { + uploadFileToBackEnd(f, false); } + currentSize += f.length(); + currentCount++; } - LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}"); + long endTime = System.currentTimeMillis(); + LOG.info("Uploaded: {" + currentCount + "}/{" + files.size() + + "} files, {" + currentSize + "}/{" + totalSize + + "} size data, time taken {" + ((endTime - startTime) / 1000) + + "} sec"); } /** * Traverse recursively and populate list with files. */ - private void listRecursive(List list, File file) { + private static void listRecursive(List list, File file) { File[] files = file.listFiles(); if (files != null) { for (File f : files) { @@ -431,12 +699,22 @@ public abstract class CachingDataStore e * file to uploaded. * @throws DataStoreException */ - private void loadFileToBackEnd(File f) throws DataStoreException { - DataIdentifier identifier = new DataIdentifier(f.getName()); - usesIdentifier(identifier); - backend.write(identifier, f); - LOG.debug(f.getName() + "uploaded."); - + private void uploadFileToBackEnd(File f, boolean updateAsyncUploadCache) + throws DataStoreException { + try { + DataIdentifier identifier = new DataIdentifier(f.getName()); + usesIdentifier(identifier); + backend.write(identifier, f); + if (updateAsyncUploadCache) { + String fileName = getFileName(identifier); + asyncWriteCache.remove(fileName); + } + if (LOG.isDebugEnabled()) { + LOG.debug(f.getName() + "uploaded."); + } + } catch (IOException ioe) { + throw new DataStoreException(ioe); + } } /** @@ -444,9 +722,17 @@ public abstract class CachingDataStore e */ private static String getFileName(DataIdentifier identifier) { String name = identifier.toString(); - name = name.substring(0, 2) + "/" + name.substring(2, 4) + "/" - + name.substring(4, 6) + "/" + name; - return name; + return getFileName(name); + } + + private static String getFileName(String name) { + return name.substring(0, 2) + "/" + name.substring(2, 4) + "/" + + name.substring(4, 6) + "/" + name; + } + + private static DataIdentifier getIdentifier(String fileName) { + return new DataIdentifier( + fileName.substring(fileName.lastIndexOf("/") + 1)); } private void usesIdentifier(DataIdentifier identifier) { @@ -457,15 +743,15 @@ public abstract class CachingDataStore e if (dir.exists()) { if (dir.isFile()) { throw new IOException("Can not create a directory " - + "because a file exists with the same name: " - + dir.getAbsolutePath()); + + "because a file exists with the same name: " + + dir.getAbsolutePath()); } return false; } boolean created = dir.mkdirs(); if (!created) { throw new IOException("Could not create directory: " - + dir.getAbsolutePath()); + + dir.getAbsolutePath()); } return created; } @@ -483,7 +769,6 @@ public abstract class CachingDataStore e public void close() throws DataStoreException { cache.close(); backend.close(); - cache = null; } /** @@ -551,7 +836,6 @@ public abstract class CachingDataStore e } /** - * * @return path of {@link LocalCache}. */ public String getPath() { @@ -602,4 +886,208 @@ public abstract class CachingDataStore e this.cachePurgeResizeFactor = cachePurgeResizeFactor; } + public int getConcurrentUploadsThreads() { + return concurrentUploadsThreads; + } + + public void setConcurrentUploadsThreads(int concurrentUploadsThreads) { + this.concurrentUploadsThreads = concurrentUploadsThreads; + } + + public int getAsyncUploadLimit() { + return asyncUploadLimit; + } + + public void setAsyncUploadLimit(int asyncUploadLimit) { + this.asyncUploadLimit = asyncUploadLimit; + } + + public boolean isContinueOnAsyncUploadFailure() { + return continueOnAsyncUploadFailure; + } + + public void setContinueOnAsyncUploadFailure( + boolean continueOnAsyncUploadFailure) { + this.continueOnAsyncUploadFailure = continueOnAsyncUploadFailure; + } + + public Backend getBackend() { + return backend; + } + + /** + * This class initiates files upload in multiple threads to backend. + */ + private class FilesUploader { + final List files; + + final long totalSize; + + volatile AtomicInteger currentCount = new AtomicInteger(); + + volatile AtomicLong currentSize = new AtomicLong(); + + volatile AtomicBoolean exceptionRaised = new AtomicBoolean(); + + DataStoreException exception; + + final int threads; + + final boolean updateAsyncCache; + + FilesUploader(List files, long totalSize, int threads, + boolean updateAsyncCache) { + super(); + this.files = files; + this.threads = threads; + this.totalSize = totalSize; + this.updateAsyncCache = updateAsyncCache; + } + + void addCurrentCount(int delta) { + currentCount.addAndGet(delta); + } + + void addCurrentSize(long delta) { + currentSize.addAndGet(delta); + } + + synchronized void setException(DataStoreException exception) { + exceptionRaised.getAndSet(true); + this.exception = exception; + } + + boolean isExceptionRaised() { + return exceptionRaised.get(); + } + + void logProgress() { + LOG.info("Uploaded: {" + currentCount.get() + "}/{" + files.size() + + "} files, {" + currentSize.get() + "}/{" + totalSize + + "} size data"); + } + + void upload() throws DataStoreException { + long startTime = System.currentTimeMillis(); + LOG.info(" Uploading " + files.size() + " using " + threads + + " threads."); + ExecutorService executor = Executors.newFixedThreadPool(threads, + new NamedThreadFactory("backend-file-upload-worker")); + int partitionSize = files.size() / (threads); + int startIndex = 0; + int endIndex = partitionSize; + for (int i = 1; i <= threads; i++) { + List partitionFileList = Collections.unmodifiableList(files.subList( + startIndex, endIndex)); + FileUploaderThread fut = new FileUploaderThread( + partitionFileList, startIndex, endIndex, this, + updateAsyncCache); + executor.execute(fut); + + startIndex = endIndex; + if (i == (threads - 1)) { + endIndex = files.size(); + } else { + endIndex = startIndex + partitionSize; + } + } + // This will make the executor accept no new threads + // and finish all existing threads in the queue + executor.shutdown(); + + try { + // Wait until all threads are finish + while (!isExceptionRaised() + && !executor.awaitTermination(15, TimeUnit.SECONDS)) { + logProgress(); + } + } catch (InterruptedException ie) { + + } + long endTime = System.currentTimeMillis(); + LOG.info("Uploaded: {" + currentCount.get() + "}/{" + files.size() + + "} files, {" + currentSize.get() + "}/{" + totalSize + + "} size data, time taken {" + ((endTime - startTime) / 1000) + + "} sec"); + if (isExceptionRaised()) { + executor.shutdownNow(); // Cancel currently executing tasks + throw exception; + } + } + + } + + /** + * This class implements {@link Runnable} interface and uploads list of + * files from startIndex to endIndex to {@link Backend} + */ + private class FileUploaderThread implements Runnable { + final List files; + + final FilesUploader filesUploader; + + final int startIndex; + + final int endIndex; + + final boolean updateAsyncCache; + + FileUploaderThread(List files, int startIndex, int endIndex, + FilesUploader controller, boolean updateAsyncCache) { + super(); + this.files = files; + this.filesUploader = controller; + this.startIndex = startIndex; + this.endIndex = endIndex; + this.updateAsyncCache = updateAsyncCache; + } + + public void run() { + long time = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("Thread [ " + Thread.currentThread().getName() + + "]: Uploading files from startIndex[" + startIndex + + "] and endIndex [" + (endIndex - 1) + + "], both inclusive."); + } + int uploadCount = 0; + long uploadSize = 0; + try { + for (File f : files) { + + if (filesUploader.isExceptionRaised()) { + break; + } + String name = f.getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("upload file = " + name); + } + if (!name.startsWith(TMP) && !name.endsWith(DS_STORE) + && f.length() > 0) { + uploadFileToBackEnd(f, updateAsyncCache); + } + uploadCount++; + uploadSize += f.length(); + // update upload status at every 15 seconds. + long now = System.currentTimeMillis(); + if (now > time + 15000) { + filesUploader.addCurrentCount(uploadCount); + filesUploader.addCurrentSize(uploadSize); + uploadCount = 0; + uploadSize = 0; + time = now; + } + } + // update final state. + filesUploader.addCurrentCount(uploadCount); + filesUploader.addCurrentSize(uploadSize); + } catch (DataStoreException e) { + if (!filesUploader.isExceptionRaised()) { + filesUploader.setException(e); + } + } + + } + } + } Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java?rev=1577127&r1=1577126&r2=1577127&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java (original) +++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java Thu Mar 13 12:12:33 2014 @@ -84,6 +84,8 @@ public class LocalCache { * no-op. */ private volatile boolean purgeMode; + + private AsyncUploadCache asyncUploadCache; /** * Build LRU cache of files located at 'path'. It uses lastModified property @@ -98,66 +100,22 @@ public class LocalCache { * cache will go in auto-purge mode. * @param cachePurgeResizeFactor after cache purge size of cache will be * just less (cachePurgeResizeFactor * maxSize). + * @param asyncUploadCache {@link AsyncUploadCache} * @throws RepositoryException */ - public LocalCache(final String path, final String tmpPath, - final long maxSize, final double cachePurgeTrigFactor, - final double cachePurgeResizeFactor) throws RepositoryException { - this.maxSize = maxSize; + public LocalCache(String path, String tmpPath, long size, double cachePurgeTrigFactor, + double cachePurgeResizeFactor, AsyncUploadCache asyncUploadCache) throws IOException, + ClassNotFoundException { + this.maxSize = size; directory = new File(path); tmp = new File(tmpPath); - cache = new LRUCache(maxSize, cachePurgeTrigFactor, - cachePurgeResizeFactor); - ArrayList allFiles = new ArrayList(); - - Iterator it = FileUtils.iterateFiles(directory, null, true); - while (it.hasNext()) { - File f = it.next(); - allFiles.add(f); - } - Collections.sort(allFiles, new Comparator() { - @Override - public int compare(final File o1, final File o2) { - long l1 = o1.lastModified(), l2 = o2.lastModified(); - return l1 < l2 ? -1 : l1 > l2 ? 1 : 0; - } - }); - String dataStorePath = directory.getAbsolutePath(); - long time = System.currentTimeMillis(); - int count = 0; - int deletecount = 0; - for (File f : allFiles) { - if (f.exists()) { - long length = f.length(); - String name = f.getPath(); - if (name.startsWith(dataStorePath)) { - name = name.substring(dataStorePath.length()); - } - // convert to java path format - name = name.replace("\\", "/"); - if (name.startsWith("/") || name.startsWith("\\")) { - name = name.substring(1); - } - if ((cache.currentSizeInBytes + length) < cache.maxSizeInBytes) { - count++; - cache.put(name, length); - } else { - if (tryDelete(name)) { - deletecount++; - } - } - long now = System.currentTimeMillis(); - if (now > time + 5000) { - LOG.info("Processed {" + (count + deletecount) + "}/{" - + allFiles.size() + "}"); - time = now; - } - } - } - LOG.info("Cached {" + count + "}/{" + allFiles.size() - + "} , currentSizeInBytes = " + cache.currentSizeInBytes); - LOG.info("Deleted {" + deletecount + "}/{" + allFiles.size() - + "} files ."); + LOG.info("cachePurgeTrigFactor = " + cachePurgeTrigFactor + ", cachePurgeResizeFactor = " + cachePurgeResizeFactor + + ", cachePurgeTrigFactorSize = " + (cachePurgeTrigFactor * size) + ", cachePurgeResizeFactor = " + + (cachePurgeResizeFactor * size)); + cache = new LRUCache(size, cachePurgeTrigFactor, cachePurgeResizeFactor); + this.asyncUploadCache = asyncUploadCache; + + new Thread(new CacheBuildJob()).start(); } /** @@ -168,51 +126,50 @@ public class LocalCache { * doesn't close the incoming inputstream. * * @param fileName the key of cache. - * @param in the inputstream. + * @param in {@link InputStream} * @return the (new) input stream. */ - public synchronized InputStream store(String fileName, final InputStream in) + public InputStream store(String fileName, final InputStream in) throws IOException { fileName = fileName.replace("\\", "/"); File f = getFile(fileName); long length = 0; - if (!f.exists() || isInPurgeMode()) { - OutputStream out = null; - File transFile = null; - try { - TransientFileFactory tff = TransientFileFactory.getInstance(); - transFile = tff.createTransientFile("s3-", "tmp", tmp); - out = new BufferedOutputStream(new FileOutputStream(transFile)); - length = IOUtils.copyLarge(in, out); - } finally { - IOUtils.closeQuietly(out); - } - // rename the file to local fs cache - if (canAdmitFile(length) - && (f.getParentFile().exists() || f.getParentFile().mkdirs()) - && transFile.renameTo(f) && f.exists()) { - if (transFile.exists() && transFile.delete()) { - LOG.warn("tmp file = " + transFile.getAbsolutePath() - + " not deleted successfully"); - } - transFile = null; - toBeDeleted.remove(fileName); - if (cache.get(fileName) == null) { + synchronized (this) { + if (!f.exists() || isInPurgeMode()) { + OutputStream out = null; + File transFile = null; + try { + TransientFileFactory tff = TransientFileFactory.getInstance(); + transFile = tff.createTransientFile("s3-", "tmp", tmp); + out = new BufferedOutputStream(new FileOutputStream(transFile)); + length = IOUtils.copyLarge(in, out); + } finally { + IOUtils.closeQuietly(out); + } + // rename the file to local fs cache + if (canAdmitFile(length) + && (f.getParentFile().exists() || f.getParentFile().mkdirs()) + && transFile.renameTo(f) && f.exists()) { + if (transFile.exists() && transFile.delete()) { + LOG.info("tmp file = " + transFile.getAbsolutePath() + + " not deleted successfully"); + } + transFile = null; + if (LOG.isDebugEnabled()) { + LOG.debug("file [" + fileName + "] added to local cache."); + } cache.put(fileName, f.length()); + } else { + f = transFile; } } else { - f = transFile; - } - } else { - // f.exists and not in purge mode - f.setLastModified(System.currentTimeMillis()); - toBeDeleted.remove(fileName); - if (cache.get(fileName) == null) { + // f.exists and not in purge mode + f.setLastModified(System.currentTimeMillis()); cache.put(fileName, f.length()); } + cache.tryPurge(); + return new LazyFileInputStream(f); } - cache.tryPurge(); - return new LazyFileInputStream(f); } /** @@ -224,29 +181,59 @@ public class LocalCache { * @param src file to be added to cache. * @throws IOException */ - public synchronized void store(String fileName, final File src) - throws IOException { + public synchronized File store(String fileName, final File src) { + try { + return store(fileName, src, false).getFile(); + } catch (IOException ioe) { + LOG.warn("Exception in addding file [" + fileName + "] to local cache.", ioe); + } + return null; + } + + /** + * This method add file to {@link LocalCache} and tries that file can be + * added to {@link AsyncUploadCache}. If file is added to + * {@link AsyncUploadCache} successfully, it sets + * {@link AsyncUploadResult#setAsyncUpload(boolean)} to true. + * + * @param fileName name of the file. + * @param src source file. + * @param tryForAsyncUpload If true it tries to add fileName to + * {@link AsyncUploadCache} + * @return {@link AsyncUploadCacheResult}. This method sets + * {@link AsyncUploadResult#setAsyncUpload(boolean)} to true, if + * fileName is added to {@link AsyncUploadCache} successfully else + * it sets {@link AsyncUploadCacheResult#setAsyncUpload(boolean)} to + * false. {@link AsyncUploadCacheResult#getFile()} contains cached + * file, if it is added to {@link LocalCache} or original file. + * @throws IOException + */ + public synchronized AsyncUploadCacheResult store(String fileName, File src, boolean tryForAsyncUpload) throws IOException { fileName = fileName.replace("\\", "/"); File dest = getFile(fileName); File parent = dest.getParentFile(); - if (src.exists() && !dest.exists() && !src.equals(dest) - && canAdmitFile(src.length()) - && (parent.exists() || parent.mkdirs()) && (src.renameTo(dest))) { - toBeDeleted.remove(fileName); - if (cache.get(fileName) == null) { - cache.put(fileName, dest.length()); - } - - } else if (dest.exists()) { - dest.setLastModified(System.currentTimeMillis()); - toBeDeleted.remove(fileName); - if (cache.get(fileName) == null) { - cache.put(fileName, dest.length()); + AsyncUploadCacheResult result = new AsyncUploadCacheResult(); + result.setFile(src); + result.setAsyncUpload(false); + boolean destExists = false; + if ((destExists = dest.exists()) + || (src.exists() && !dest.exists() && !src.equals(dest) && canAdmitFile(src.length()) + && (parent.exists() || parent.mkdirs()) && (src.renameTo(dest)))) { + if (destExists) { + dest.setLastModified(System.currentTimeMillis()); + } + cache.put(fileName, dest.length()); + if (LOG.isDebugEnabled()) { + LOG.debug("file [" + fileName + "] added to local cache."); + } + result.setFile(dest); + if (tryForAsyncUpload) { + result.setAsyncUpload(asyncUploadCache.add(fileName).canAsyncUpload()); } } cache.tryPurge(); + return result; } - /** * Return the inputstream from from cache, or null if not in the cache. * @@ -254,16 +241,23 @@ public class LocalCache { * @return stream or null. */ public InputStream getIfStored(String fileName) throws IOException { + File file = getFileIfStored(fileName); + return file == null ? null : new LazyFileInputStream(file); + } + public synchronized File getFileIfStored(String fileName) throws IOException { fileName = fileName.replace("\\", "/"); File f = getFile(fileName); - synchronized (this) { - if (!f.exists() || isInPurgeMode()) { - log("purgeMode true or file doesn't exists: getIfStored returned"); - return null; - } + // return file in purge mode = true and file present in asyncUploadCache + // as asyncUploadCache's files will be not be deleted in cache purge. + if (!f.exists() || (isInPurgeMode() && !asyncUploadCache.hasEntry(fileName, false))) { + log("purgeMode true or file doesn't exists: getFileIfStored returned"); + return null; + } else { + // touch entry in LRU caches + cache.put(fileName, f.length()); f.setLastModified(System.currentTimeMillis()); - return new LazyFileInputStream(f); + return f; } } @@ -286,17 +280,20 @@ public class LocalCache { * Returns length of file if exists in cache else returns null. * @param fileName name of the file. */ - public Long getFileLength(String fileName) { - fileName = fileName.replace("\\", "/"); - File f = getFile(fileName); - synchronized (this) { - if (!f.exists() || isInPurgeMode()) { - log("purgeMode true or file doesn't exists: getFileLength returned"); - return null; + public synchronized Long getFileLength(String fileName) { + Long length = null; + try { + length = cache.get(fileName); + if( length == null ) { + File f = getFileIfStored(fileName); + if (f != null) { + length = f.length(); + } } - f.setLastModified(System.currentTimeMillis()); - return f.length(); + } catch (IOException ignore) { + } + return length; } /** @@ -315,11 +312,10 @@ public class LocalCache { * @return true if yes else return false. */ private synchronized boolean canAdmitFile(final long length) { - // order is important here - boolean value = !isInPurgeMode() && cache.canAdmitFile(length); + //order is important here + boolean value = !isInPurgeMode() && (cache.canAdmitFile(length)); if (!value) { - log("cannot admit file of length=" + length - + " and currentSizeInBytes=" + cache.currentSizeInBytes); + log("cannot admit file of length=" + length + " and currentSizeInBytes=" + cache.currentSizeInBytes); } return value; } @@ -410,11 +406,11 @@ public class LocalCache { final long maxSizeInBytes; - long cachePurgeResize; + final long cachePurgeResize; - private long cachePurgeTrigSize; + final long cachePurgeTrigSize; - public LRUCache(final long maxSizeInBytes, + LRUCache(final long maxSizeInBytes, final double cachePurgeTrigFactor, final double cachePurgeResizeFactor) { super(maxSizeElements(maxSizeInBytes), (float) 0.75, true); @@ -433,20 +429,32 @@ public class LocalCache { public synchronized Long remove(final Object key) { String fileName = (String) key; fileName = fileName.replace("\\", "/"); + try { + // not removing file from local cache, if there is in progress + // async upload on it. + if (asyncUploadCache.hasEntry(fileName, false)) { + LOG.info("AsyncUploadCache upload contains file [" + fileName + + "]. Not removing it from LocalCache."); + return null; + } + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("error: ", e); + } + return null; + } Long flength = null; if (tryDelete(fileName)) { flength = super.remove(key); if (flength != null) { - log("cache entry { " + fileName + "} with size {" + flength - + "} removed."); + log("cache entry { " + fileName + "} with size {" + flength + "} removed."); currentSizeInBytes -= flength.longValue(); } } else if (!getFile(fileName).exists()) { // second attempt. remove from cache if file doesn't exists flength = super.remove(key); if (flength != null) { - log(" file not exists. cache entry { " + fileName - + "} with size {" + flength + "} removed."); + log(" file not exists. cache entry { " + fileName + "} with size {" + flength + "} removed."); currentSizeInBytes -= flength.longValue(); } } @@ -454,10 +462,15 @@ public class LocalCache { } @Override - public synchronized Long put(final String key, final Long value) { - long flength = value.longValue(); - currentSizeInBytes += flength; - return super.put(key.replace("\\", "/"), value); + public synchronized Long put(final String fileName, final Long value) { + Long oldValue = cache.get(fileName); + if (oldValue == null) { + long flength = value.longValue(); + currentSizeInBytes += flength; + return super.put(fileName.replace("\\", "/"), value); + } + toBeDeleted.remove(fileName); + return oldValue; } /** @@ -468,10 +481,14 @@ public class LocalCache { synchronized void tryPurge() { if (currentSizeInBytes > cachePurgeTrigSize && !isInPurgeMode()) { setPurgeMode(true); - LOG.info("currentSizeInBytes[" + cache.currentSizeInBytes - + "] exceeds (cachePurgeTrigSize)[" - + cache.cachePurgeTrigSize + "]"); + LOG.info("currentSizeInBytes[" + cache.currentSizeInBytes + "] exceeds (cachePurgeTrigSize)[" + cache.cachePurgeTrigSize + + "]"); new Thread(new PurgeJob()).start(); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("currentSizeInBytes[" + cache.currentSizeInBytes + "] and (cachePurgeTrigSize)[" + cache.cachePurgeTrigSize + + "], isInPurgeMode =[" + isInPurgeMode() + "]"); + } } } /** @@ -532,4 +549,64 @@ public class LocalCache { } } } + + /** + * This class implements {@link Runnable} interface to build LRU cache + * asynchronously. + */ + private class CacheBuildJob implements Runnable { + public void run() { + long startTime = System.currentTimeMillis(); + ArrayList allFiles = new ArrayList(); + Iterator it = FileUtils.iterateFiles(directory, null, true); + while (it.hasNext()) { + File f = it.next(); + allFiles.add(f); + } + long t1 = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("Time taken to recursive [" + allFiles.size() + "] took [" + ((t1 - startTime) / 1000) + "]sec"); + } + Collections.sort(allFiles, new Comparator() { + public int compare(File o1, File o2) { + long l1 = o1.lastModified(), l2 = o2.lastModified(); + return l1 < l2 ? -1 : l1 > l2 ? 1 : 0; + } + }); + long t2 = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("Time taken to sort [" + allFiles.size() + "] took [" + ((t2 - t1) / 1000) + "]sec"); + } + String dataStorePath = directory.getAbsolutePath(); + long time = System.currentTimeMillis(); + int count = 0; + for (File f : allFiles) { + if (f.exists()) { + count++; + String name = f.getPath(); + if (name.startsWith(dataStorePath)) { + name = name.substring(dataStorePath.length()); + } + // convert to java path format + name = name.replace("\\", "/"); + if (name.startsWith("/") || name.startsWith("\\")) { + name = name.substring(1); + } + store(name, f); + long now = System.currentTimeMillis(); + if (now > time + 10000) { + LOG.info("Processed {" + (count) + "}/{" + allFiles.size() + "}"); + time = now; + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Processed {" + count + "}/{" + allFiles.size() + "} , currentSizeInBytes = " + cache.currentSizeInBytes + + ", maxSizeInBytes = " + cache.maxSizeInBytes + ", cache.filecount = " + cache.size()); + } + long t3 = System.currentTimeMillis(); + LOG.info("Time to build cache of [" + allFiles.size() + "] took [" + ((t3 - startTime) / 1000) + "]sec"); + } + } } + Added: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java?rev=1577127&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java (added) +++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java Thu Mar 13 12:12:33 2014 @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.core.data.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class extends {@link ThreadFactory} to creates named threads. + */ +public class NamedThreadFactory implements ThreadFactory { + + private AtomicInteger threadCount = new AtomicInteger(1); + + String threadPrefixName; + + public NamedThreadFactory(String threadPrefixName) { + super(); + this.threadPrefixName = threadPrefixName; + } + + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setContextClassLoader(getClass().getClassLoader()); + thread.setName(threadPrefixName + "-" + threadCount.getAndIncrement()); + return thread; + } + +} Propchange: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/util/NamedThreadFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java?rev=1577127&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java (added) +++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java Thu Mar 13 12:12:33 2014 @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.core.data; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.jackrabbit.core.data.AsyncUploadCallback; +import org.apache.jackrabbit.core.data.Backend; +import org.apache.jackrabbit.core.data.CachingDataStore; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataStoreException; + +/** + * An in-memory backend implementation used to speed up testing. + */ +public class InMemoryBackend implements Backend { + + private HashMap data = new HashMap(); + + private HashMap timeMap = new HashMap(); + + @Override + public void init(CachingDataStore store, String homeDir, String config) + throws DataStoreException { + // ignore + log("init"); + } + + @Override + public void close() { + // ignore + log("close"); + } + + @Override + public boolean exists(final DataIdentifier identifier) { + log("exists " + identifier); + return data.containsKey(identifier); + } + + @Override + public Iterator getAllIdentifiers() + throws DataStoreException { + log("getAllIdentifiers"); + return data.keySet().iterator(); + } + + @Override + public InputStream read(final DataIdentifier identifier) + throws DataStoreException { + log("read " + identifier); + return new ByteArrayInputStream(data.get(identifier)); + } + + @Override + public void writeAsync(final DataIdentifier identifier, final File file, + final AsyncUploadCallback callback) throws DataStoreException { + this.write(identifier, file, true, callback); + } + + @Override + public void write(final DataIdentifier identifier, final File file) + throws DataStoreException { + this.write(identifier, file, false, null); + } + + @Override + public long getLastModified(final DataIdentifier identifier) + throws DataStoreException { + log("getLastModified " + identifier); + return timeMap.get(identifier); + } + + @Override + public void deleteRecord(final DataIdentifier identifier) + throws DataStoreException { + timeMap.remove(identifier); + data.remove(identifier); + } + + @Override + public Set deleteAllOlderThan(final long min) { + log("deleteAllOlderThan " + min); + Set tobeDeleted = new HashSet(); + for (Map.Entry entry : timeMap.entrySet()) { + DataIdentifier identifier = entry.getKey(); + long timestamp = entry.getValue(); + if (timestamp < min) { + tobeDeleted.add(identifier); + } + } + for (DataIdentifier identifier : tobeDeleted) { + timeMap.remove(identifier); + data.remove(identifier); + } + return tobeDeleted; + } + + @Override + public long getLength(final DataIdentifier identifier) + throws DataStoreException { + try { + return data.get(identifier).length; + } catch (Exception e) { + throw new DataStoreException(e); + } + } + + @Override + public boolean exists(final DataIdentifier identifier, final boolean touch) + throws DataStoreException { + boolean retVal = data.containsKey(identifier); + if (retVal && touch) { + timeMap.put(identifier, System.currentTimeMillis()); + } + return retVal; + } + + private void write(final DataIdentifier identifier, final File file, + final boolean async, final AsyncUploadCallback callback) + throws DataStoreException { + log("write " + identifier + " " + file.length()); + byte[] buffer = new byte[(int) file.length()]; + try { + if (async && callback == null) { + throw new IllegalArgumentException( + "callback parameter cannot be null"); + } + DataInputStream din = new DataInputStream(new FileInputStream(file)); + din.readFully(buffer); + din.close(); + data.put(identifier, buffer); + timeMap.put(identifier, System.currentTimeMillis()); + } catch (IOException e) { + if (async) { + callback.call(identifier, file, + AsyncUploadCallback.RESULT.ABORTED); + } + throw new DataStoreException(e); + } + if (async) { + callback.call(identifier, file, AsyncUploadCallback.RESULT.SUCCESS); + } + } + + /** + * Log a message if logging is enabled. + * + * @param message + * the message + */ + private void log(final String message) { + // System.out.println(message); + } +} Propchange: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java?rev=1577127&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java (added) +++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java Thu Mar 13 12:12:33 2014 @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.data; + +import org.apache.jackrabbit.core.data.Backend; +import org.apache.jackrabbit.core.data.CachingDataStore; + +/** + * A caching data store that uses the in-memory backend. + */ +public class InMemoryDataStore extends CachingDataStore { + + @Override + protected Backend createBackend() { + return new InMemoryBackend(); + } + + @Override + protected String getMarkerFile() { + return "mem.init.done"; + } +} Propchange: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryDataStore.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java?rev=1577127&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java (added) +++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java Thu Mar 13 12:12:33 2014 @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.data; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * An input stream that returns pseudo-random bytes. + */ +public class RandomInputStream extends InputStream { + + private static final long MUL = 0x5DEECE66DL; + private static final long ADD = 0xBL; + private static final long MASK = (1L << 48) - 1; + private static final int DEFAULT_MAX_READ_BLOCK_SIZE = 15; + + private final long initialSeed; + private final long len; + private long markedState; + private long pos; + private long markedPos; + private long state; + private int maxReadBlockSize; + + public String toString() { + return "new RandomInputStream(" + initialSeed + ", " + len + ")"; + } + + public RandomInputStream(long seed, long len) { + this(seed, len, DEFAULT_MAX_READ_BLOCK_SIZE); + } + + public static void compareStreams(InputStream a, InputStream b) throws IOException { + a = new BufferedInputStream(a); + b = new BufferedInputStream(b); + long pos = 0; + while (true) { + int x = a.read(); + int y = b.read(); + if (x == -1 || y == -1) { + if (x == y) { + break; + } + } + if (x != y) { + throw new IOException("Incorrect byte at position " + pos + ": x=" + x + " y=" + y); + } + } + } + + public RandomInputStream(long seed, long len, int maxReadBlockSize) { + this.initialSeed = seed; + this.len = len; + this.maxReadBlockSize = maxReadBlockSize; + setSeed(seed); + reset(); + } + + public long skip(long n) { + n = getReadBlock(n); + if (n == 0) { + return -1; + } + pos += n; + return n; + } + + private int getReadBlock(long n) { + if (n > (len - pos)) { + n = (len - pos); + } + if (n > maxReadBlockSize) { + n = maxReadBlockSize; + } else if (n < 0) { + n = 0; + } + return (int) n; + } + + public int read(byte[] b, int off, int len) { + if (pos >= this.len) { + return -1; + } + len = getReadBlock(len); + if (len == 0) { + return -1; + } + for (int i = 0; i < len; i++) { + b[off + i] = (byte) (next() & 255); + } + pos += len; + return len; + } + + public int read(byte[] b) { + return read(b, 0, b.length); + } + + public void close() { + pos = len; + } + + private void setSeed(long seed) { + markedState = (seed ^ MUL) & MASK; + } + + private int next() { + state = (state * MUL + ADD) & MASK; + return (int) (state >>> (48 - 32)); + } + + public void reset() { + pos = markedPos; + state = markedState; + } + + public int read() { + if (pos >= len) { + return -1; + } + pos++; + return next() & 255; + } + + public boolean markSupported() { + return true; + } + + public void mark(int readlimit) { + markedPos = pos; + markedState = state; + } + +} Propchange: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java ------------------------------------------------------------------------------ svn:eol-style = native