camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: CAMEL-10448: File read lock - idempotent and change/rename should be possible
Date Mon, 07 Nov 2016 09:28:21 GMT
Repository: camel
Updated Branches:
  refs/heads/master 86f4cec71 -> 5643f5557


CAMEL-10448: File read lock - idempotent and change/rename should be possible


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5643f555
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5643f555
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5643f555

Branch: refs/heads/master
Commit: 5643f5557a9af9e073ee1ab982d488ecc88fa076
Parents: 86f4cec
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Nov 7 10:28:14 2016 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Nov 7 10:28:14 2016 +0100

----------------------------------------------------------------------
 .../component/file/GenericFileEndpoint.java     |   8 +-
 ...potentChangedRepositoryReadLockStrategy.java | 237 +++++++++++++++++++
 ...mpotentRenameRepositoryReadLockStrategy.java | 229 ++++++++++++++++++
 .../strategy/FileProcessStrategyFactory.java    |  38 +++
 .../FileIdempotentChangedReadLockTest.java      |  46 ++++
 .../FileIdempotentRenameReadLockTest.java       |  46 ++++
 6 files changed, 602 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index e251154..d9449cc 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -154,7 +154,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint
imple
     protected Comparator<Exchange> sortBy;
     @UriParam(label = "consumer,sort")
     protected boolean shuffle;
-    @UriParam(label = "consumer,lock", enums = "none,markerFile,fileLock,rename,changed,idempotent")
+    @UriParam(label = "consumer,lock", enums = "none,markerFile,fileLock,rename,changed,idempotent,idempotent-changed,idempotent-rename")
     protected String readLock = "none";
     @UriParam(label = "consumer,lock", defaultValue = "1000")
     protected long readLockCheckInterval = 1000;
@@ -796,6 +796,10 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint
imple
      *     <li>rename - rename is for using a try to rename the file as a test if we
can get exclusive read-lock.</li>
      *     <li>idempotent - (only for file component) idempotent is for using a idempotentRepository
as the read-lock.
      *     This allows to use read locks that supports clustering if the idempotent repository
implementation supports that.</li>
+     *     <li>idempotent-changed - (only for file component) idempotent-changed is
for using a idempotentRepository and changed as the combined read-lock.
+     *     This allows to use read locks that supports clustering if the idempotent repository
implementation supports that.</li>
+     *     <li>idempotent-rename - (only for file component) idempotent-rename is for
using a idempotentRepository and rename as the combined read-lock.
+     *     This allows to use read locks that supports clustering if the idempotent repository
implementation supports that.</li>
      * </ul>
      * Notice: The various read locks is not all suited to work in clustered mode, where
concurrent consumers on different nodes is competing
      * for the same files on a shared file system. The markerFile using a close to atomic
operation to create the empty marker file,
@@ -1247,7 +1251,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint
imple
         if (readLock != null) {
             params.put("readLock", readLock);
         }
-        if ("idempotent".equals(readLock)) {
+        if ("idempotent".equals(readLock) || "idempotent-changed".equals(readLock) || "idempotent-rename".equals(readLock))
{
             params.put("readLockIdempotentRepository", idempotentRepository);
         }
         if (readLockCheckInterval > 0) {

http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
new file mode 100644
index 0000000..ab8e31f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileEndpoint;
+import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
+import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A file read lock that uses an {@link IdempotentRepository} and {@link FileChangedExclusiveReadLockStrategy
changed} as the lock strategy.
+ * This allows to plugin and use existing idempotent repositories that for example supports
clustering.
+ * The other read lock strategies that are using marker files or file locks, are not guaranteed
to work in clustered setup with various platform and file systems.
+ */
+public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupport implements
GenericFileExclusiveReadLockStrategy<File>, CamelContextAware {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentChangedRepositoryReadLockStrategy.class);
+
+    private final FileChangedExclusiveReadLockStrategy changed;
+    private GenericFileEndpoint<File> endpoint;
+    private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
+    private CamelContext camelContext;
+    private IdempotentRepository<String> idempotentRepository;
+    private boolean removeOnRollback = true;
+    private boolean removeOnCommit;
+
+    public FileIdempotentChangedRepositoryReadLockStrategy() {
+        this.changed = new FileChangedExclusiveReadLockStrategy();
+        // no need to use marker file as idempotent ensures exclusive read-lock
+        this.changed.setMarkerFiler(false);
+        this.changed.setDeleteOrphanLockFiles(false);
+    }
+
+    @Override
+    public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File>
endpoint) throws Exception {
+        this.endpoint = endpoint;
+        LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository,
endpoint);
+
+        changed.prepareOnStartup(operations, endpoint);
+    }
+
+    @Override
+    public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
+        // in clustered mode then another node may have processed the file so we must check
here again if the file exists
+        File path = file.getFile();
+        if (!path.exists()) {
+            return false;
+        }
+
+        // check if we can begin on this file
+        String key = asKey(file);
+        boolean answer = idempotentRepository.add(key);
+        if (!answer) {
+            // another node is processing the file so skip
+            CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip
the file: " + file);
+        }
+
+        if (answer) {
+            // if we acquired during idempotent then check changed also
+            answer = changed.acquireExclusiveReadLock(operations, file, exchange);
+            if (!answer) {
+                // remove from idempontent as we did not acquire it from changed
+                idempotentRepository.remove(key);
+            }
+        }
+        return answer;
+    }
+
+    @Override
+    public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
+        changed.releaseExclusiveReadLockOnAbort(operations, file, exchange);
+    }
+
+    @Override
+    public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
+        String key = asKey(file);
+        if (removeOnRollback) {
+            idempotentRepository.remove(key);
+        } else {
+            // okay we should not remove then confirm it instead
+            idempotentRepository.confirm(key);
+        }
+
+        changed.releaseExclusiveReadLockOnRollback(operations, file, exchange);
+    }
+
+    @Override
+    public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
+        String key = asKey(file);
+        if (removeOnCommit) {
+            idempotentRepository.remove(key);
+        } else {
+            // confirm on commit
+            idempotentRepository.confirm(key);
+        }
+
+        changed.releaseExclusiveReadLockOnCommit(operations, file, exchange);
+    }
+
+    public void setTimeout(long timeout) {
+        changed.setTimeout(timeout);
+    }
+
+    public void setCheckInterval(long checkInterval) {
+        changed.setCheckInterval(checkInterval);
+    }
+
+    public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
+        this.readLockLoggingLevel = readLockLoggingLevel;
+        changed.setReadLockLoggingLevel(readLockLoggingLevel);
+    }
+
+    public void setMarkerFiler(boolean markerFile) {
+        // we do not use marker files
+    }
+
+    public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+        // we do not use marker files
+    }
+
+    public void setMinLength(long minLength) {
+        changed.setMinLength(minLength);
+    }
+
+    public void setMinAge(long minAge) {
+        changed.setMinAge(minAge);
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    /**
+     * The idempotent repository to use as the store for the read locks.
+     */
+    public IdempotentRepository<String> getIdempotentRepository() {
+        return idempotentRepository;
+    }
+
+    /**
+     * The idempotent repository to use as the store for the read locks.
+     */
+    public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository)
{
+        this.idempotentRepository = idempotentRepository;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a rollback.
+     * <p/>
+     * By default this is true.
+     */
+    public boolean isRemoveOnRollback() {
+        return removeOnRollback;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a rollback.
+     * <p/>
+     * By default this is true.
+     */
+    public void setRemoveOnRollback(boolean removeOnRollback) {
+        this.removeOnRollback = removeOnRollback;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is false.
+     */
+    public boolean isRemoveOnCommit() {
+        return removeOnCommit;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is false.
+     */
+    public void setRemoveOnCommit(boolean removeOnCommit) {
+        this.removeOnCommit = removeOnCommit;
+    }
+
+    protected String asKey(GenericFile<File> file) {
+        // use absolute file path as default key, but evaluate if an expression key was configured
+        String key = file.getAbsoluteFilePath();
+        if (endpoint.getIdempotentKey() != null) {
+            Exchange dummy = endpoint.createExchange(file);
+            key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
+        }
+        return key;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "camelContext", this);
+        ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
+
+        // ensure the idempotent repository is added as a service so CamelContext will stop
the repo when it shutdown itself
+        camelContext.addService(idempotentRepository, true);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
new file mode 100644
index 0000000..a99ee23
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileEndpoint;
+import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
+import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A file read lock that uses an {@link IdempotentRepository} and {@link FileRenameExclusiveReadLockStrategy
rename} as the lock strategy.
+ * This allows to plugin and use existing idempotent repositories that for example supports
clustering.
+ * The other read lock strategies that are using marker files or file locks, are not guaranteed
to work in clustered setup with various platform and file systems.
+ */
+public class FileIdempotentRenameRepositoryReadLockStrategy extends ServiceSupport implements
GenericFileExclusiveReadLockStrategy<File>, CamelContextAware {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRenameRepositoryReadLockStrategy.class);
+
+    private final FileRenameExclusiveReadLockStrategy rename;
+    private GenericFileEndpoint<File> endpoint;
+    private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
+    private CamelContext camelContext;
+    private IdempotentRepository<String> idempotentRepository;
+    private boolean removeOnRollback = true;
+    private boolean removeOnCommit;
+
+    public FileIdempotentRenameRepositoryReadLockStrategy() {
+        this.rename = new FileRenameExclusiveReadLockStrategy();
+        // no need to use marker file as idempotent ensures exclusive read-lock
+        this.rename.setMarkerFiler(false);
+        this.rename.setDeleteOrphanLockFiles(false);
+    }
+
+    @Override
+    public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File>
endpoint) throws Exception {
+        this.endpoint = endpoint;
+        LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository,
endpoint);
+
+        rename.prepareOnStartup(operations, endpoint);
+    }
+
+    @Override
+    public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
+        // in clustered mode then another node may have processed the file so we must check
here again if the file exists
+        File path = file.getFile();
+        if (!path.exists()) {
+            return false;
+        }
+
+        // check if we can begin on this file
+        String key = asKey(file);
+        boolean answer = idempotentRepository.add(key);
+        if (!answer) {
+            // another node is processing the file so skip
+            CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip
the file: " + file);
+        }
+
+        if (answer) {
+            // if we acquired during idempotent then check rename also
+            answer = rename.acquireExclusiveReadLock(operations, file, exchange);
+            if (!answer) {
+                // remove from idempontent as we did not acquire it from changed
+                idempotentRepository.remove(key);
+            }
+        }
+        return answer;
+    }
+
+    @Override
+    public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
+        rename.releaseExclusiveReadLockOnAbort(operations, file, exchange);
+    }
+
+    @Override
+    public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
+        String key = asKey(file);
+        if (removeOnRollback) {
+            idempotentRepository.remove(key);
+        } else {
+            // okay we should not remove then confirm it instead
+            idempotentRepository.confirm(key);
+        }
+
+        rename.releaseExclusiveReadLockOnRollback(operations, file, exchange);
+    }
+
+    @Override
+    public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
+        String key = asKey(file);
+        if (removeOnCommit) {
+            idempotentRepository.remove(key);
+        } else {
+            // confirm on commit
+            idempotentRepository.confirm(key);
+        }
+
+        rename.releaseExclusiveReadLockOnCommit(operations, file, exchange);
+    }
+
+    public void setTimeout(long timeout) {
+        rename.setTimeout(timeout);
+    }
+
+    public void setCheckInterval(long checkInterval) {
+        rename.setCheckInterval(checkInterval);
+    }
+
+    public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
+        this.readLockLoggingLevel = readLockLoggingLevel;
+        rename.setReadLockLoggingLevel(readLockLoggingLevel);
+    }
+
+    public void setMarkerFiler(boolean markerFile) {
+        // we do not use marker files
+    }
+
+    public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
+        // we do not use marker files
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    /**
+     * The idempotent repository to use as the store for the read locks.
+     */
+    public IdempotentRepository<String> getIdempotentRepository() {
+        return idempotentRepository;
+    }
+
+    /**
+     * The idempotent repository to use as the store for the read locks.
+     */
+    public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository)
{
+        this.idempotentRepository = idempotentRepository;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a rollback.
+     * <p/>
+     * By default this is true.
+     */
+    public boolean isRemoveOnRollback() {
+        return removeOnRollback;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a rollback.
+     * <p/>
+     * By default this is true.
+     */
+    public void setRemoveOnRollback(boolean removeOnRollback) {
+        this.removeOnRollback = removeOnRollback;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is false.
+     */
+    public boolean isRemoveOnCommit() {
+        return removeOnCommit;
+    }
+
+    /**
+     * Whether to remove the file from the idempotent repository when doing a commit.
+     * <p/>
+     * By default this is false.
+     */
+    public void setRemoveOnCommit(boolean removeOnCommit) {
+        this.removeOnCommit = removeOnCommit;
+    }
+
+    protected String asKey(GenericFile<File> file) {
+        // use absolute file path as default key, but evaluate if an expression key was configured
+        String key = file.getAbsoluteFilePath();
+        if (endpoint.getIdempotentKey() != null) {
+            Exchange dummy = endpoint.createExchange(file);
+            key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
+        }
+        return key;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "camelContext", this);
+        ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
+
+        // ensure the idempotent repository is added as a service so CamelContext will stop
the repo when it shutdown itself
+        camelContext.addService(idempotentRepository, true);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
index f5348b0..3d74ea2 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
@@ -143,6 +143,44 @@ public final class FileProcessStrategyFactory {
                     readLockStrategy.setIdempotentRepository(repo);
                 }
                 strategy = readLockStrategy;
+            } else if ("idempotent-changed".equals(readLock)) {
+                FileIdempotentChangedRepositoryReadLockStrategy readLockStrategy = new FileIdempotentChangedRepositoryReadLockStrategy();
+                Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback");
+                if (readLockRemoveOnRollback != null) {
+                    readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
+                }
+                Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
+                if (readLockRemoveOnCommit != null) {
+                    readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
+                }
+                IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
+                if (repo != null) {
+                    readLockStrategy.setIdempotentRepository(repo);
+                }
+                Long minLength = (Long) params.get("readLockMinLength");
+                if (minLength != null) {
+                    readLockStrategy.setMinLength(minLength);
+                }
+                Long minAge = (Long) params.get("readLockMinAge");
+                if (null != minAge) {
+                    readLockStrategy.setMinAge(minAge);
+                }
+                strategy = readLockStrategy;
+            } else if ("idempotent-rename".equals(readLock)) {
+                FileIdempotentRenameRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRenameRepositoryReadLockStrategy();
+                Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback");
+                if (readLockRemoveOnRollback != null) {
+                    readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);
+                }
+                Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit");
+                if (readLockRemoveOnCommit != null) {
+                    readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit);
+                }
+                IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository");
+                if (repo != null) {
+                    readLockStrategy.setIdempotentRepository(repo);
+                }
+                strategy = readLockStrategy;
             }
 
             if (strategy != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentChangedReadLockTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentChangedReadLockTest.java
b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentChangedReadLockTest.java
new file mode 100644
index 0000000..2f60395
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentChangedReadLockTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class FileIdempotentChangedReadLockTest extends FileIdempotentReadLockTest {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/changed/in?readLock=idempotent-changed&idempotentRepository=#myRepo")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // we are in progress
+                            int size = myRepo.getCacheSize();
+                            assertTrue(size == 1 || size == 2);
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5643f555/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentRenameReadLockTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentRenameReadLockTest.java
b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentRenameReadLockTest.java
new file mode 100644
index 0000000..3aa89f0
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileIdempotentRenameReadLockTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class FileIdempotentRenameReadLockTest extends FileIdempotentReadLockTest {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/changed/in?readLock=idempotent-rename&idempotentRepository=#myRepo")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // we are in progress
+                            int size = myRepo.getCacheSize();
+                            assertTrue(size == 1 || size == 2);
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+}


Mime
View raw message