camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1244980 [1/3] - in /camel/trunk: components/ components/camel-mongodb/ components/camel-mongodb/src/ components/camel-mongodb/src/main/ components/camel-mongodb/src/main/java/ components/camel-mongodb/src/main/java/org/ components/camel-mo...
Date Thu, 16 Feb 2012 14:09:57 GMT
Author: davsclaus
Date: Thu Feb 16 14:09:55 2012
New Revision: 1244980

URL: http://svn.apache.org/viewvc?rev=1244980&view=rev
Log:
CAMEL-4878: New camel-mongodb component. Thanks to Raul for the patch.

Added:
    camel/trunk/components/camel-mongodb/   (with props)
    camel/trunk/components/camel-mongodb/pom.xml   (with props)
    camel/trunk/components/camel-mongodb/src/
    camel/trunk/components/camel-mongodb/src/main/
    camel/trunk/components/camel-mongodb/src/main/java/
    camel/trunk/components/camel-mongodb/src/main/java/org/
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/converters/
    camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/converters/MongoDbBasicConverters.java   (with props)
    camel/trunk/components/camel-mongodb/src/main/resources/
    camel/trunk/components/camel-mongodb/src/main/resources/LICENSE.txt   (with props)
    camel/trunk/components/camel-mongodb/src/main/resources/META-INF/
    camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/
    camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/
    camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/
    camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/camel/
    camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
    camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/camel/component/
    camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/camel/component/mongodb
    camel/trunk/components/camel-mongodb/src/main/resources/NOTICE.txt   (with props)
    camel/trunk/components/camel-mongodb/src/test/
    camel/trunk/components/camel-mongodb/src/test/java/
    camel/trunk/components/camel-mongodb/src/test/java/org/
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/AbstractMongoDbTest.java   (with props)
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbConversionsTest.java   (with props)
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java   (with props)
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbExceptionHandlingTest.java   (with props)
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbFindOperationTest.java   (with props)
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java   (with props)
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumerTest.java   (with props)
    camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbWriteConcernsTest.java   (with props)
    camel/trunk/components/camel-mongodb/src/test/resources/
    camel/trunk/components/camel-mongodb/src/test/resources/log4j.properties   (with props)
    camel/trunk/components/camel-mongodb/src/test/resources/mongodb.test.properties   (with props)
    camel/trunk/components/camel-mongodb/src/test/resources/org/
    camel/trunk/components/camel-mongodb/src/test/resources/org/apache/
    camel/trunk/components/camel-mongodb/src/test/resources/org/apache/camel/
    camel/trunk/components/camel-mongodb/src/test/resources/org/apache/camel/component/
    camel/trunk/components/camel-mongodb/src/test/resources/org/apache/camel/component/mongodb/
    camel/trunk/components/camel-mongodb/src/test/resources/org/apache/camel/component/mongodb/mongoComponentTest.xml   (with props)
Modified:
    camel/trunk/components/pom.xml
    camel/trunk/parent/pom.xml

Propchange: camel/trunk/components/camel-mongodb/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Feb 16 14:09:55 2012
@@ -0,0 +1,17 @@
+.pmd
+.checkstyle
+.ruleset
+target
+.settings
+.classpath
+.project
+.wtpmodules
+prj.el
+.jdee_classpath
+.jdee_sources
+velocity.log
+eclipse-classes
+*.ipr
+*.iml
+*.iws
+*.idea

Added: camel/trunk/components/camel-mongodb/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/pom.xml?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/pom.xml (added)
+++ camel/trunk/components/camel-mongodb/pom.xml Thu Feb 16 14:09:55 2012
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>components</artifactId>
+    <groupId>org.apache.camel</groupId>
+    <version>2.10-SNAPSHOT</version>
+    <relativePath>../</relativePath>
+  </parent>
+
+  <groupId>org.apache.camel</groupId>
+  <artifactId>camel-mongodb</artifactId>
+  <packaging>bundle</packaging>
+  <name>Camel :: MongoDB</name>
+  <description>Camel MongoDB component</description>
+
+  <properties>
+    <camel.osgi.export.pkg>org.apache.camel.component.mongodb.*</camel.osgi.export.pkg>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+
+    <!-- MongoDB driver dependency -->
+    <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongo-java-driver</artifactId>
+      <version>${mongo-java-driver-version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>${jackson-version}</version>
+      <optional>true</optional>
+    </dependency>
+
+    <!-- testing -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-spring</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

Propchange: camel/trunk/components/camel-mongodb/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/pom.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-mongodb/pom.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public class CamelMongoDbException extends Exception {
+
+    private static final long serialVersionUID = 7834484945432331909L;
+
+    public CamelMongoDbException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public CamelMongoDbException(String message) {
+        super(message);
+    }
+
+    public CamelMongoDbException(Throwable cause) {
+        super(cause);
+    }
+
+}

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import java.util.Map;
+
+import com.mongodb.Mongo;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.CamelContextHelper;
+
+/**
+ * Represents the component that manages {@link MongoDbEndpoint}.
+ */
+public class MongoDbComponent extends DefaultComponent {
+
+    /**
+     * Should access a singleton of type Mongo
+     */
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        Mongo db = CamelContextHelper.mandatoryLookup(getCamelContext(), remaining, Mongo.class);
+
+        Endpoint endpoint = new MongoDbEndpoint(uri, this);
+        parameters.put("mongoConnection", db);
+        setProperties(endpoint, parameters);
+        
+        return endpoint;
+    }
+
+    public static CamelMongoDbException wrapInCamelMongoDbException(Throwable t) {
+        if (t instanceof CamelMongoDbException) {
+            return (CamelMongoDbException) t;
+        } else {
+            return new CamelMongoDbException(t);
+        }
+    }
+    
+    
+}

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public final class MongoDbConstants {
+    
+    public static final String OPERATION_HEADER = "CamelMongoDbOperation";    
+    public static final String RESULT_TOTAL_SIZE = "CamelMongoDbResultTotalSize";
+    public static final String RESULT_PAGE_SIZE = "CamelMongoDbResultPageSize";
+    public static final String FIELDS_FILTER = "CamelMongoDbFieldsFilter";
+    public static final String BATCH_SIZE = "CamelMongoDbBatchSize";
+    public static final String NUM_TO_SKIP = "CamelMongoDbNumToSkip";
+    public static final String INSERT_RECORDS_AFFECTED = "CamelMongoDbInsertRecordsAffected";
+    public static final String LAST_ERROR = "CamelMongoDbLastError";
+    public static final String MULTIUPDATE = "CamelMongoDbMultiUpdate";
+    public static final String UPSERT = "CamelMongoDbUpsert";
+    public static final String RECORDS_AFFECTED = "CamelMongoDbRecordsAffected";
+    public static final String SORT_BY = "CamelMongoDbSortBy";
+    public static final String DATABASE = "CamelMongoDbDatabase";
+    public static final String COLLECTION = "CamelMongoDbCollection";
+    public static final String WRITECONCERN = "CamelMongoDbWriteConcern";
+    public static final String LIMIT = "CamelMongoDbLimit";
+    public static final String FROM_TAILABLE = "CamelMongoDbTailable";
+    
+    private MongoDbConstants() { }
+    
+}

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public enum MongoDbConsumerType {
+
+    tailable
+    // more consumer types to be included in future versions
+    
+}

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,503 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.Mongo;
+import com.mongodb.ReadPreference;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.ObjectHelper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a MongoDb endpoint. 
+ * It is responsible for creating {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances.
+ * It accepts a number of options to customise the behaviour of consumers and producers.
+ */
+public class MongoDbEndpoint extends DefaultEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDbEndpoint.class);
+    private Mongo mongoConnection;
+    private String database;
+    private String collection;
+    private MongoDbOperation operation;
+    private boolean createCollection = true;
+    private boolean invokeGetLastError; // = false
+    private WriteConcern writeConcern;
+    private WriteConcern writeConcernRef;
+    private ReadPreference readPreference;
+    private boolean dynamicity; // = false
+    // tailable cursor consumer by default
+    private MongoDbConsumerType consumerType;
+    private long cursorRegenerationDelay = 1000L;
+    private String tailTrackIncreasingField;
+    
+    // persitent tail tracking
+    private boolean persistentTailTracking; // = false;
+    private String persistentId;
+    private String tailTrackDb;
+    private String tailTrackCollection;
+    private String tailTrackField;
+    
+    private MongoDbTailTrackingConfig tailTrackingConfig;
+    
+    private DBCollection dbCollection;
+    private DB db;
+
+    // ======= Constructors ===============================================
+
+    public MongoDbEndpoint() {
+    }
+
+    public MongoDbEndpoint(String uri, MongoDbComponent component) {
+        super(uri, component);
+    }
+
+    @SuppressWarnings("deprecation")
+    public MongoDbEndpoint(String endpointUri) {
+        super(endpointUri);
+    }
+
+    // ======= Implementation methods =====================================
+    
+    public Producer createProducer() throws Exception {
+        validateOptions('P');
+        initializeConnection();
+        return new MongoDbProducer(this);
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        validateOptions('C');
+        // we never create the collection
+        createCollection = false;
+        initializeConnection();
+                
+        // select right consumer type
+        if (consumerType == null) {
+            consumerType = MongoDbConsumerType.tailable;
+        }
+        
+        Consumer consumer = null;
+        if (consumerType == MongoDbConsumerType.tailable) {
+            consumer = new MongoDbTailableCursorConsumer(this, processor);
+        } else {
+            throw new CamelMongoDbException("Consumer type not supported: " + consumerType);
+        }
+        
+        return consumer;
+    }
+
+    private void validateOptions(char role) throws IllegalArgumentException {
+        // make our best effort to validate, options with defaults are checked against their defaults, which is not always a guarantee that
+        // they haven't been explicitly set, but it is enough
+        if (role == 'P') {
+            if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb) 
+                    || !ObjectHelper.isEmpty(tailTrackCollection) || !ObjectHelper.isEmpty(tailTrackField) || cursorRegenerationDelay != 1000L) {
+                throw new IllegalArgumentException("consumerType, tailTracking, cursorRegenerationDelay options cannot appear on a producer endpoint");
+            }
+        } else if (role == 'C') {
+            if (!ObjectHelper.isEmpty(operation) || !ObjectHelper.isEmpty(writeConcern) || writeConcernRef != null 
+                    || readPreference != null || dynamicity || invokeGetLastError) {
+                throw new IllegalArgumentException("operation, writeConcern, writeConcernRef, readPreference, dynamicity, invokeGetLastError " 
+                        + "options cannot appear on a consumer endpoint");
+            }
+            
+            if (consumerType == MongoDbConsumerType.tailable) {
+                if (tailTrackIncreasingField == null) {
+                    throw new IllegalArgumentException("tailTrackIncreasingField option must be set for tailable cursor MongoDB consumer endpoint");
+                }
+                if (persistentTailTracking && (ObjectHelper.isEmpty(persistentId))) {
+                    throw new IllegalArgumentException("persistentId is compulsory for persistent tail tracking");
+                }
+            }
+            
+        } else {
+            throw new IllegalArgumentException("Unknown endpoint role");
+        }
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+    
+    /**
+     * Initialises the MongoDB connection using the Mongo object provided to the endpoint
+     * @throws CamelMongoDbException
+     */
+    public void initializeConnection() throws CamelMongoDbException {
+        LOG.info("Initialising MongoDb endpoint: {}", this.toString());
+        if (database == null || collection == null) {
+            throw new CamelMongoDbException("Missing required endpoint configuration: database and/or collection");
+        }
+        db = mongoConnection.getDB(database);
+        if (db == null) {
+            throw new CamelMongoDbException("Could not initialise MongoDbComponent. Database " + database + " does not exist.");
+        }
+        if (!createCollection && !db.collectionExists(collection)) {
+            throw new CamelMongoDbException("Could not initialise MongoDbComponent. Collection " + collection + " and createCollection is false.");
+        }
+        dbCollection = db.getCollection(collection);
+        
+        LOG.info("MongoDb component initialised and endpoint bound to MongoDB collection with the following paramters. Address list: {}, Db: {}, Collection: {}", 
+                new Object[] {mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()});
+    }
+
+    /**
+     * Applies validation logic specific to this endpoint type. If everything succeeds, continues initialization
+     */
+    @Override
+    protected void doStart() throws Exception {
+        if (writeConcern != null && writeConcernRef != null) {
+            LOG.error("Cannot set both writeConcern and writeConcernRef at the same time. Respective values: {}, {}. "
+                    + "Aborting initialization.", new Object[] {writeConcern, writeConcernRef});
+            throw new IllegalArgumentException("Cannot set both writeConcern and writeConcernRef at the same time on MongoDB endpoint");
+        }
+
+        setWriteReadOptionsOnConnection();
+        super.doStart();
+    }
+
+    public Exchange createMongoDbExchange(DBObject dbObj) {
+        Exchange exchange = new DefaultExchange(this.getCamelContext(), getExchangePattern());
+        Message message = new DefaultMessage();
+        message.setHeader(MongoDbConstants.DATABASE, database);
+        message.setHeader(MongoDbConstants.COLLECTION, collection);
+        message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
+        
+        message.setBody(dbObj);
+        exchange.setIn(message);
+        return exchange;
+    }
+    
+    private void setWriteReadOptionsOnConnection() {
+        // Set the WriteConcern
+        if (writeConcern != null) {
+            mongoConnection.setWriteConcern(writeConcern);
+        } else if (writeConcernRef != null) {
+            mongoConnection.setWriteConcern(writeConcernRef);
+        }
+        
+        // Set the ReadPreference
+        if (readPreference != null) {
+            mongoConnection.setReadPreference(readPreference);
+        }
+    }
+    
+    
+    // ======= Getters and setters ===============================================
+    
+    /**
+     * Sets the name of the MongoDB collection to bind to this endpoint
+     * @param collection collection name
+     */
+    public void setCollection(String collection) {
+        this.collection = collection;
+    }
+
+    public String getCollection() {
+        return collection;
+    }
+
+    /**
+     * Sets the operation this endpoint will execute against MongoDB. For possible values, see {@link MongoDbOperation}.
+     * @param operation name of the operation as per catalogued values
+     * @throws CamelMongoDbException
+     */
+    public void setOperation(String operation) throws CamelMongoDbException {
+        try {
+            this.operation = MongoDbOperation.valueOf(operation);
+        } catch (IllegalArgumentException e) {
+            throw new CamelMongoDbException("Operation not supported", e);
+        }
+    }
+
+    public MongoDbOperation getOperation() {
+        return operation;
+    }
+
+    /**
+     * Sets the name of the MongoDB database to target
+     * @param database name of the MongoDB database
+     */
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    /**
+     * Create collection during initialisation if it doesn't exist. Default is true.
+     * @param createCollection true or false
+     */
+    public void setCreateCollection(boolean createCollection) {
+        this.createCollection = createCollection;
+    }
+
+    public boolean isCreateCollection() {
+        return createCollection;
+    }
+
+    public DB getDb() {
+        return db;
+    }
+
+    public DBCollection getDbCollection() {
+        return dbCollection;
+    }
+    
+    /**
+     * Sets the Mongo instance that represents the backing connection
+     * @param mongoConnection the connection to the database
+     */
+    public void setMongoConnection(Mongo mongoConnection) {
+        this.mongoConnection = mongoConnection;
+    }
+
+    public Mongo getMongoConnection() {
+        return mongoConnection;
+    }
+
+    /**
+     * Set the {@link WriteConcern} for write operations on MongoDB using the standard ones.
+     * Resolved from the fields of the WriteConcern class by calling the {@link WriteConcern#valueOf(String)} method.
+     * @param writeConcern the standard name of the WriteConcern
+     * @see <a href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible options</a>
+     */
+    public void setWriteConcern(String writeConcern) {
+        this.writeConcern = WriteConcern.valueOf(writeConcern);
+    }
+
+    public WriteConcern getWriteConcern() {
+        return writeConcern;
+    }
+
+    /**
+     * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with every operation. By default, MongoDB does not wait
+     * for the write operation to occur before returning. If set to true, each exchange will only return after the write operation 
+     * has actually occurred in MongoDB.
+     * @param invokeGetLastError true or false
+     */
+    public void setInvokeGetLastError(boolean invokeGetLastError) {
+        this.invokeGetLastError = invokeGetLastError;
+    }
+
+    public boolean isInvokeGetLastError() {
+        return invokeGetLastError;
+    }
+
+    /**
+     * Set the {@link WriteConcern} for write operations on MongoDB, passing in the bean ref to a custom WriteConcern which exists in the Registry.
+     * You can also use standard WriteConcerns by passing in their key. See the {@link #setWriteConcern(String) setWriteConcern} method.
+     * @param writeConcernRef the name of the bean in the registry that represents the WriteConcern to use
+     */
+    public void setWriteConcernRef(String writeConcernRef) {
+        WriteConcern wc = this.getCamelContext().getRegistry().lookup(writeConcernRef, WriteConcern.class);
+        if (wc == null) {
+            LOG.error("Camel MongoDB component could not find the WriteConcern in the Registry. Verify that the " 
+                    + "provided bean name ({}) is correct. Aborting initialization.", writeConcernRef);
+            throw new IllegalArgumentException("Camel MongoDB component could not find the WriteConcern in the Registry");   
+        }
+    
+        this.writeConcernRef = wc;
+    }
+
+    public WriteConcern getWriteConcernRef() {
+        return writeConcernRef;
+    }
+
+    /** 
+     * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read preferences set directly on the connection will be
+     * overridden by this setting.
+     * @param readPreference the bean name of the read preference to set
+     */
+    public void setReadPreference(String readPreference) {
+        Class<?>[] innerClasses = ReadPreference.class.getDeclaredClasses();
+        for (Class<?> inClass : innerClasses) {
+            if (inClass.getSuperclass() == ReadPreference.class && inClass.getName().equals(readPreference)) {
+                try {
+                    this.readPreference = (ReadPreference) inClass.getConstructor((Class<?>) null).newInstance((Object[]) null);
+                } catch (Exception e) {
+                    continue;
+                }
+                break;
+            }
+        }
+        
+        LOG.error("Could not resolve specified ReadPreference of type {}. Read preferences are resolved from inner " 
+                + "classes of com.mongodb.ReadPreference.", readPreference);
+        throw new IllegalArgumentException("MongoDB endpoint could not resolve specified ReadPreference");
+    }
+    
+    public ReadPreference getReadPreference() {
+        return readPreference;
+    }
+
+    /**
+     * Sets whether this endpoint will attempt to dynamically resolve the target database and collection from the incoming Exchange properties.
+     * Can be used to override at runtime the database and collection specified on the otherwise static endpoint URI.
+     * It is disabled by default to boost performance. Enabling it will take a minimal performance hit.
+     * @see MongoDbConstants#DATABASE
+     * @see MongoDbConstants#COLLECTION
+     * @param dynamicity true or false indicated whether target database and collection should be calculated dynamically based on Exchange properties.
+     */
+    public void setDynamicity(boolean dynamicity) {
+        this.dynamicity = dynamicity;
+    }
+
+    public boolean isDynamicity() {
+        return dynamicity;
+    }
+
+    /**
+     * Reserved for future use, when more consumer types are supported. 
+     * @param consumerType key of the consumer type
+     * @throws CamelMongoDbException
+     */
+    public void setConsumerType(String consumerType) throws CamelMongoDbException {
+        try {
+            this.consumerType = MongoDbConsumerType.valueOf(consumerType);
+        } catch (IllegalArgumentException e) {
+            throw new CamelMongoDbException("Consumer type not supported", e);
+        }
+    }
+
+    public MongoDbConsumerType getConsumerType() {
+        return consumerType;
+    }
+    
+    public String getTailTrackDb() {
+        return tailTrackDb;
+    }
+
+    /**
+     * Indicates what database the tail tracking mechanism will persist to. If not specified, the current database will 
+     * be picked by default. Dynamicity will not be taken into account even if enabled, i.e. the tail tracking database 
+     * will not vary past endpoint initialisation.
+     * @param tailTrackDb database name
+     */
+    public void setTailTrackDb(String tailTrackDb) {
+        this.tailTrackDb = tailTrackDb;
+    }
+
+    public String getTailTrackCollection() {
+        return tailTrackCollection;
+    }
+
+    /**
+     * Collection where tail tracking information will be persisted. If not specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION} 
+     * will be used by default.
+     * @param tailTrackCollection collection name
+     */
+    public void setTailTrackCollection(String tailTrackCollection) {
+        this.tailTrackCollection = tailTrackCollection;
+    }
+
+    public String getTailTrackField() {
+        return tailTrackField;
+    }
+
+    /**
+     * Field where the last tracked value will be placed. If not specified,  {@link MongoDbTailTrackingConfig#DEFAULT_FIELD} 
+     * will be used by default.
+     * @param tailTrackField field name
+     */
+    public void setTailTrackField(String tailTrackField) {
+        this.tailTrackField = tailTrackField;
+    }
+
+    /**
+     * Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts.
+     * The next time the system is up, the endpoint will recover the cursor from the point where it last stopped slurping records.
+     * @param persistentTailTracking true or false
+     */
+    public void setPersistentTailTracking(boolean persistentTailTracking) {
+        this.persistentTailTracking = persistentTailTracking;
+    }
+
+    public boolean isPersistentTailTracking() {
+        return persistentTailTracking;
+    }
+    
+    /**
+     * Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every 
+     * time it is generated.
+     * The cursor will be (re)created with a query of type: tailTrackIncreasingField > lastValue (possibly recovered from persistent
+     * tail tracking).
+     * Can be of type Integer, Date, String, etc.
+     * NOTE: No support for dot notation at the current time, so the field should be at the top level of the document.
+     * @param tailTrackIncreasingField
+     */
+    public void setTailTrackIncreasingField(String tailTrackIncreasingField) {
+        this.tailTrackIncreasingField = tailTrackIncreasingField;
+    }
+
+    public String getTailTrackIncreasingField() {
+        return tailTrackIncreasingField;
+    }
+
+    public MongoDbTailTrackingConfig getTailTrackingConfig() {
+        if (tailTrackingConfig == null) {
+            tailTrackingConfig = new MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField, 
+                    tailTrackDb == null ? database : tailTrackDb, tailTrackCollection, tailTrackField, getPersistentId());
+        }
+        return tailTrackingConfig;       
+    }
+
+    /**
+     * MongoDB tailable cursors will block until new data arrives. If no new data is inserted, after some time the cursor will be automatically
+     * freed and closed by the MongoDB server. The client is expected to regenerate the cursor if needed. This value specifies the time to wait
+     * before attempting to fetch a new cursor, and if the attempt fails, how long before the next attempt is made. Default value is 1000ms.
+     * @param cursorRegenerationDelay delay specified in milliseconds
+     */
+    public void setCursorRegenerationDelay(long cursorRegenerationDelay) {
+        this.cursorRegenerationDelay = cursorRegenerationDelay;
+    }
+
+    public long getCursorRegenerationDelay() {
+        return cursorRegenerationDelay;
+    }
+
+    /**
+     * One tail tracking collection can host many trackers for several tailable consumers. 
+     * To keep them separate, each tracker should have its own unique persistentId.
+     * @param persistentId the value of the persistent ID to use for this tailable consumer
+     */
+    public void setPersistentId(String persistentId) {
+        this.persistentId = persistentId;
+    }
+
+    public String getPersistentId() {
+        return persistentId;
+    }
+
+}

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public enum MongoDbOperation {
+    
+    // read operations
+    findById,
+    findOneByQuery,
+    findAll,
+    // group, // future
+    // mapReduce, // future
+    
+    // create/update operations
+    insert,
+    save, 
+    update, 
+    
+    // delete operations
+    remove, 
+    
+    // others
+    getDbStats, 
+    getColStats, 
+    count, 
+    
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.CommandResult;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The MongoDb producer.
+ */
+public class MongoDbProducer extends DefaultProducer {
+    private static final transient Logger LOG = LoggerFactory.getLogger(MongoDbProducer.class);
+    private MongoDbEndpoint endpoint;
+
+    public MongoDbProducer(MongoDbEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        MongoDbOperation operation = endpoint.getOperation();
+        Object header = exchange.getIn().getHeader(MongoDbConstants.OPERATION_HEADER);
+        if (header != null) {
+            LOG.debug("Overriding default operation with operation specified on header: {}", header);
+            try {
+                if (header instanceof MongoDbOperation) {
+                    operation = ObjectHelper.cast(MongoDbOperation.class, header);
+                } else {
+                    // evaluate as a String
+                    operation = MongoDbOperation.valueOf(exchange.getIn().getHeader(MongoDbConstants.OPERATION_HEADER, String.class));
+                }
+            } catch (Exception e) {
+                LOG.error("Operation not supported: {}", header);
+                exchange.setException(new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e));
+                return;
+            }
+        }
+        
+        try {
+            invokeOperation(operation, exchange);
+        } catch (Exception e) {
+            CamelMongoDbException partEx = MongoDbComponent.wrapInCamelMongoDbException(e);
+            LOG.error("Breaking MongoDB operation due to exception", partEx);
+            exchange.setException(partEx);
+        }
+        
+    }
+
+    /**
+     * Entry method that selects the appropriate MongoDB operation and executes it
+     * @param operation
+     * @param exchange
+     * @throws Exception
+     */
+    protected void invokeOperation(MongoDbOperation operation, Exchange exchange) throws Exception {
+        switch (operation) {
+        case count:
+            doCount(exchange);
+            break;
+            
+        case findOneByQuery:
+            doFindOneByQuery(exchange);
+            break;
+
+        case findById:
+            doFindById(exchange);
+            break;
+
+        case findAll:
+            doFindAll(exchange);
+            break;
+
+        case insert:
+            doInsert(exchange);
+            break;
+
+        case save:
+            doSave(exchange);
+            break;
+
+        case update:
+            doUpdate(exchange);
+            break;
+
+        case remove:
+            doRemove(exchange);
+            break;
+
+        case getDbStats:
+            doGetStats(exchange, 'D');
+            break;
+
+        case getColStats:
+            doGetStats(exchange, 'C');
+            break;
+
+        default:
+            LOG.error("Unexpected operation found: {}", operation);
+            exchange.setException(new CamelMongoDbException("Operation not supported. Value: " + operation));
+            break;
+        }
+    }
+
+    // ----------- MongoDB operations ----------------
+    
+    protected void doGetStats(Exchange exchange, char c) {
+        DBObject result = null;
+        
+        if (c == 'C') {
+            result = calculateCollection(exchange).getStats();
+        } else if (c == 'D') {
+            // if it's a DB, also take into account the dynamicity option and the DB that is used
+            result = calculateCollection(exchange).getDB().getStats();
+        }
+
+        exchange.getOut().setBody(result);
+    }
+
+    protected void doRemove(Exchange exchange) throws Exception {
+        DBCollection dbCol = calculateCollection(exchange);
+        DBObject removeObj = exchange.getIn().getMandatoryBody(DBObject.class);
+        
+        WriteConcern wc = extractWriteConcern(exchange);
+        WriteResult result = wc == null ? dbCol.remove(removeObj) : dbCol.remove(removeObj, wc);
+        processWriteResult(result, exchange);
+        
+        Message out = exchange.getOut();
+        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
+        // obtain the cached CommandResult
+        out.setBody(result);
+        out.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void doUpdate(Exchange exchange) throws Exception {
+        DBCollection dbCol = calculateCollection(exchange);
+        List<DBObject> saveObj = exchange.getIn().getMandatoryBody((Class<List<DBObject>>) (Class<?>) List.class);
+        if (saveObj.size() != 2) {
+            throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of DBObject objects with size = 2");
+        }
+        
+        DBObject updateCriteria = saveObj.get(0);
+        DBObject objNew = saveObj.get(1);
+        
+        Boolean multi = exchange.getIn().getHeader(MongoDbConstants.MULTIUPDATE, Boolean.class);
+        Boolean upsert = exchange.getIn().getHeader(MongoDbConstants.UPSERT, Boolean.class);
+        
+        WriteResult result;
+        WriteConcern wc = extractWriteConcern(exchange);
+        // In API 2.7, the default upsert and multi values of update(DBObject, DBObject) are false, false, so we unconditionally invoke the
+        // full-signature method update(DBObject, DBObject, boolean, boolean). However, the default behaviour may change in the future, 
+        // so it's safer to be explicit at this level for full determinism
+        if (multi == null && upsert == null) {
+            // for update with no multi nor upsert but with specific WriteConcern there is no update signature without multi and upsert args,
+            // so assume defaults
+            result = wc == null ? dbCol.update(updateCriteria, objNew) : dbCol.update(updateCriteria, objNew, false, false, wc);
+        } else {
+            // we calculate the final boolean values so that if any of these parameters is null, it is resolved to false
+            result = wc == null ? dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi)) 
+                    : dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi), wc);
+        }
+        
+        processWriteResult(result, exchange);
+        Message out = exchange.getOut();
+        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
+        // obtain the cached CommandResult
+        out.setBody(result);
+        out.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
+    }
+    
+    protected void doSave(Exchange exchange) throws Exception {
+        DBCollection dbCol = calculateCollection(exchange);
+        DBObject saveObj = exchange.getIn().getMandatoryBody(DBObject.class);
+        
+        WriteConcern wc = extractWriteConcern(exchange);
+        WriteResult result = wc == null ? dbCol.save(saveObj) : dbCol.save(saveObj, wc);
+        processWriteResult(result, exchange);
+        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
+        // obtain the cached CommandResult
+        exchange.getOut().setBody(result);
+    }
+    
+    protected void doFindById(Exchange exchange) throws Exception {
+        DBCollection dbCol = calculateCollection(exchange);
+        Object o = exchange.getIn().getMandatoryBody();
+        DBObject ret;
+        
+        DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
+        if (fieldFilter == null) {
+            ret = dbCol.findOne(o);
+        } else {
+            ret = dbCol.findOne(o, fieldFilter);
+        }
+    
+        Message out = exchange.getOut();
+        out.setBody(ret);
+        out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+        
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    protected void doInsert(Exchange exchange) throws Exception {
+        DBCollection dbCol = calculateCollection(exchange);
+        boolean singleInsert = true;
+        Object insert = exchange.getIn().getBody(DBObject.class);
+        // body could not be converted to DBObject, check to see if it's of type List<DBObject>
+        if (insert == null) {
+            insert = exchange.getIn().getBody(List.class);
+            // if the body of type List was obtained, ensure that all items are of type DBObject and cast the List to List<DBObject>
+            if (insert != null) {
+                singleInsert = false;
+                insert = attemptConvertToList((List) insert, exchange);
+            } else {
+                throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type DBObject nor List<DBObject>");
+            }
+        }
+        
+        WriteResult result;
+        WriteConcern wc = extractWriteConcern(exchange);
+        if (singleInsert) {
+            result = wc == null ? dbCol.insert((DBObject) insert) : dbCol.insert((DBObject) insert, wc);
+        } else {
+            result = wc == null ? dbCol.insert((List<DBObject>) insert) : dbCol.insert((List<DBObject>) insert, wc);
+        }
+        
+        processWriteResult(result, exchange);
+        
+        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
+        // obtain the cached CommandResult
+        exchange.getOut().setBody(result);
+    }
+
+    protected void doFindAll(Exchange exchange) throws Exception {
+        DBCollection dbCol = calculateCollection(exchange);
+        // do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection
+        DBObject query = null;
+        // do not run around looking for a type converter unless there is a need for it
+        if (exchange.getIn().getBody() != null) {
+            query = exchange.getIn().getBody(DBObject.class);
+        }
+        DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
+        
+        // get the batch size and number to skip
+        Integer batchSize = exchange.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class);
+        Integer numToSkip = exchange.getIn().getHeader(MongoDbConstants.NUM_TO_SKIP, Integer.class);
+        Integer limit = exchange.getIn().getHeader(MongoDbConstants.LIMIT, Integer.class);
+        DBObject sortBy = exchange.getIn().getHeader(MongoDbConstants.SORT_BY, DBObject.class);
+        DBCursor ret = null;
+        try {  
+            if (query == null && fieldFilter == null) {
+                ret = dbCol.find(new BasicDBObject());
+            } else if (fieldFilter == null) {
+                ret = dbCol.find(query);
+            } else {
+                ret = dbCol.find(query, fieldFilter);
+            }
+            
+            if (sortBy != null) {
+                ret.sort(sortBy);
+            }
+            
+            if (batchSize != null) {
+                ret.batchSize(batchSize.intValue());
+            }
+            
+            if (numToSkip != null) {
+                ret.skip(numToSkip.intValue());
+            }
+    
+            if (limit != null) {
+                ret.limit(limit.intValue());
+            }
+            
+            Message out = exchange.getOut();
+            out.setBody(ret.toArray());
+            out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret.count());
+            out.setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ret.size());
+            
+        } catch (Exception e) {
+            // rethrow the exception
+            throw e;
+        } finally {
+            // make sure the cursor is closed
+            if (ret != null) {
+                ret.close();
+            }
+        }
+        
+    }
+
+    protected void doFindOneByQuery(Exchange exchange) throws Exception {
+        DBCollection dbCol = calculateCollection(exchange);
+        DBObject o = exchange.getIn().getMandatoryBody(DBObject.class);
+        DBObject ret;
+
+        DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
+        if (fieldFilter == null) {
+            ret = dbCol.findOne(o);
+        } else {
+            ret = dbCol.findOne(o, fieldFilter);
+        }
+        
+        Message out = exchange.getOut();
+        out.setBody(ret);
+        out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+    }
+
+    protected void doCount(Exchange exchange) throws Exception {
+        DBCollection dbCol = calculateCollection(exchange);
+        Long answer = Long.valueOf(dbCol.count());
+        exchange.getOut().setBody(answer);
+    }
+    
+    // --------- Convenience methods -----------------------
+    
+    private DBCollection calculateCollection(Exchange exchange) {
+        // dynamic calculation is an option. In most cases it won't be used and we should not penalise all users with running this
+        // resolution logic on every Exchange if they won't be using this functionality at all
+        if (!endpoint.isDynamicity()) {
+            return endpoint.getDbCollection();
+        }
+        
+        String dynamicDB = exchange.getIn().getHeader(MongoDbConstants.DATABASE, String.class);
+        String dynamicCollection = exchange.getIn().getHeader(MongoDbConstants.COLLECTION, String.class);
+        
+        if (dynamicDB == null && dynamicCollection == null) {
+            return endpoint.getDbCollection();
+        }
+        
+        DB db = endpoint.getDb();
+        DBCollection dbCol = null;
+        
+        if (dynamicDB != null) {
+            db = endpoint.getMongoConnection().getDB(dynamicDB);
+        }
+        
+        dbCol = dynamicCollection == null ? db.getCollection(endpoint.getCollection()) : db.getCollection(dynamicCollection);
+        LOG.debug("Dynamic database and/or collection selected: {}->{}", dbCol.getDB().getName(), dbCol.getName());
+        return dbCol;
+    }
+    
+    private boolean calculateBooleanValue(Boolean b) {
+        return b == null ? false : b.booleanValue();      
+    }
+    
+    private void processWriteResult(WriteResult result, Exchange exchange) {
+        // if invokeGetLastError is set, or a WriteConcern is set which implicitly calls getLastError, then we have the chance to populate 
+        // the MONGODB_LAST_ERROR header, as well as setting an exception on the Exchange if one occurred at the MongoDB server
+        if (endpoint.isInvokeGetLastError() || (endpoint.getWriteConcern() != null ? endpoint.getWriteConcern().callGetLastError() : false)) {
+            CommandResult cr = result.getCachedLastError() == null ? result.getLastError() : result.getCachedLastError();
+            exchange.getOut().setHeader(MongoDbConstants.LAST_ERROR, cr);
+            if (!cr.ok()) {
+                exchange.setException(MongoDbComponent.wrapInCamelMongoDbException(cr.getException()));
+            }
+        }
+    }
+    
+    private WriteConcern extractWriteConcern(Exchange exchange) throws CamelMongoDbException {
+        Object o = exchange.getIn().getHeader(MongoDbConstants.WRITECONCERN);
+        
+        if (o == null) {
+            return null;
+        } else if (o instanceof WriteConcern) {
+            return ObjectHelper.cast(WriteConcern.class, o);
+        } else if (o instanceof String) {
+            WriteConcern answer = WriteConcern.valueOf(ObjectHelper.cast(String.class, o));
+            if (answer == null) {
+                throw new CamelMongoDbException("WriteConcern specified in the " + MongoDbConstants.WRITECONCERN 
+                        + " header, with value " + o + " could not be resolved to a WriteConcern type");
+            }
+        }
+        
+        // should never get here
+        LOG.warn("A problem occurred while resolving the Exchange's Write Concern");
+        return null;
+    }
+    
+    @SuppressWarnings("rawtypes")
+    private List<DBObject> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException {
+        List<DBObject> dbObjectList = new ArrayList<DBObject>(insertList.size());
+        TypeConverter converter = exchange.getContext().getTypeConverter();
+        for (Object item : insertList) {
+            try {
+                DBObject dbObject = converter.mandatoryConvertTo(DBObject.class, item);
+                dbObjectList.add(dbObject);
+            } catch (Exception e) {
+                throw new CamelMongoDbException("MongoDB operation = insert, Assuming List variant of MongoDB insert operation, but List contains non-DBObject items", e);
+            }
+        }
+        return dbObjectList;
+    }
+    
+}

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public class MongoDbTailTrackingConfig {
+    
+    public static final String DEFAULT_COLLECTION = "camelTailTracking";
+    public static final String DEFAULT_FIELD = "lastTrackingValue";
+    
+    /**
+     * See {@link MongoDbEndpoint#setTailTrackIncreasingField(String)}
+     */
+    public final String increasingField;
+    /**
+     * See {@link MongoDbEndpoint#setPersistentTailTracking(boolean)}
+     */
+    public final boolean persistent;
+    /**
+     * See {@link MongoDbEndpoint#setTailTrackDb(String)}
+     */
+    public final String db;
+    /**
+     * See {@link MongoDbEndpoint#setTailTrackCollection(String)}
+     */
+    public final String collection;
+    /**
+     * See {@link MongoDbEndpoint#setTailTrackField(String)}
+     */
+    public final String field;
+    /**
+     * See {@link MongoDbEndpoint#setPersistentId(String)}
+     */
+    public final String persistentId;
+    
+    public MongoDbTailTrackingConfig(boolean persistentTailTracking, String tailTrackIncreasingField, String tailTrackDb,
+            String tailTrackCollection, String tailTrackField, String persistentId) {
+        this.increasingField = tailTrackIncreasingField;
+        this.persistent = persistentTailTracking;
+        this.db = tailTrackDb;
+        this.persistentId = persistentId;
+        this.collection = tailTrackCollection == null ? MongoDbTailTrackingConfig.DEFAULT_COLLECTION : tailTrackCollection;
+        this.field = tailTrackField == null ? MongoDbTailTrackingConfig.DEFAULT_FIELD : tailTrackField;
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.Mongo;
+import com.mongodb.WriteConcern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoDbTailTrackingManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailTrackingManager.class);
+    
+    public Object lastVal;
+
+    private final Mongo connection;
+    private final MongoDbTailTrackingConfig config;
+    private DBCollection dbCol;
+    private DBObject trackingObj;
+    
+    public MongoDbTailTrackingManager(Mongo connection, MongoDbTailTrackingConfig config) {
+        this.connection = connection;
+        this.config = config;
+    }
+    
+    public void initialize() throws Exception {
+        if (!config.persistent) {
+            return;
+        }
+        
+        dbCol = connection.getDB(config.db).getCollection(config.collection);
+        DBObject filter = new BasicDBObject("persistentId", config.persistentId);
+        trackingObj = dbCol.findOne(filter);
+        if (trackingObj == null) {
+            dbCol.insert(filter, WriteConcern.SAFE);
+            trackingObj = dbCol.findOne();
+        }
+        // keep only the _id, the rest is useless and causes more overhead during update
+        trackingObj = new BasicDBObject("_id", trackingObj.get("_id"));
+    }
+    
+    public synchronized void persistToStore() {
+        if (!config.persistent || lastVal == null) {
+            return;
+        }
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Persisting lastVal={} to store, collection: {}", lastVal, config.collection);
+        }
+        
+        DBObject updateObj = BasicDBObjectBuilder.start().add("$set", new BasicDBObject(config.field, lastVal)).get();
+        dbCol.update(trackingObj, updateObj, false, false, WriteConcern.SAFE);
+        trackingObj = dbCol.findOne();
+    }
+    
+    public synchronized Object recoverFromStore() {
+        if (!config.persistent) {
+            return null;
+        }
+        
+        lastVal = dbCol.findOne(trackingObj).get(config.field);
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovered lastVal={} from store, collection: {}", lastVal, config.collection);
+        }
+        
+        return lastVal;
+    }
+    
+    public void setLastVal(DBObject o) {
+        if (config.increasingField == null) {
+            return;
+        }
+        
+        lastVal = o.get(config.increasingField);
+    }
+    
+    public String getIncreasingFieldName() {
+        return config.increasingField;
+    }
+}

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+
+/**
+ * The MongoDb consumer.
+ */
+public class MongoDbTailableCursorConsumer extends DefaultConsumer {
+    private final MongoDbEndpoint endpoint;
+    private ExecutorService executor;
+    private MongoDbTailingProcess tailingProcess;
+
+    public MongoDbTailableCursorConsumer(MongoDbEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (tailingProcess != null) {
+            tailingProcess.stop();
+        }
+        if (executor != null) {
+            executor.shutdown();
+            executor = null;
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), 1);
+        MongoDbTailTrackingManager trackingManager = initTailTracking();
+        tailingProcess = new MongoDbTailingProcess(endpoint, this, trackingManager);
+        tailingProcess.initializeProcess();
+        executor.execute(tailingProcess);
+    }
+    
+    protected MongoDbTailTrackingManager initTailTracking() throws Exception {
+        MongoDbTailTrackingManager answer = new MongoDbTailTrackingManager(endpoint.getMongoConnection(), endpoint.getTailTrackingConfig());
+        answer.initialize();
+        return answer;
+    }
+    
+}

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.Bytes;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoException.CursorNotFound;
+
+import org.apache.camel.Exchange;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoDbTailingProcess implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
+    private static final String CAPPED_KEY = "capped";
+
+    public volatile boolean keepRunning = true;
+    public volatile boolean stopped; // = false
+    
+    private final DBCollection dbCol;
+    private final MongoDbEndpoint endpoint;
+    private final MongoDbTailableCursorConsumer consumer;
+    
+    // create local, final copies of these variables for increased performance
+    private final long cursorRegenerationDelay;
+    private final boolean cursorRegenerationDelayEnabled;
+    
+    private DBCursor cursor;
+    private MongoDbTailTrackingManager tailTracking;
+    
+
+    public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
+        this.endpoint = endpoint;
+        this.consumer = consumer;
+        this.dbCol = endpoint.getDbCollection();
+        this.tailTracking = tailTrack;
+        this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
+        this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
+    }
+
+    public DBCursor getCursor() {
+        return cursor;
+    }
+
+    /**
+     * Initialise the tailing process, the cursor and if persistent tail tracking is enabled, recover the cursor from the persisted point.
+     * As part of the initialisation process, the component will validate that the collection we are targeting is 'capped'.
+     * @throws Exception
+     */
+    public void initializeProcess() throws Exception {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+        }
+
+        if (dbCol.getStats().getInt(CAPPED_KEY) != 1) {
+            throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + dbCol.getName()
+                    + " is not capped");
+        }
+        try {
+            // recover the last value from the store if it exists
+            tailTracking.recoverFromStore();
+            cursor = initializeCursor();
+        } catch (Exception e) {
+            throw new CamelMongoDbException("Exception ocurred while initializing tailable cursor", e);
+        }
+
+        if (cursor == null) {
+            throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
+        }
+        
+    }
+
+    /**
+     * The heart of the tailing process.
+     */
+    @Override
+    public void run() {
+        while (keepRunning) {
+            doRun();
+            // if the previous call didn't return because we have stopped running, then regenerate the cursor
+            if (keepRunning) {
+                cursor.close();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", tailTracking.lastVal, cursorRegenerationDelay);
+                }
+                
+                if (cursorRegenerationDelayEnabled) {
+                    try {
+                        Thread.sleep(cursorRegenerationDelay);
+                    } catch (InterruptedException e) {
+                        LOG.error("Thread was interrupted", e);
+                    }
+                }
+                    
+                cursor = initializeCursor();
+            }
+        }
+        
+        stopped = true;
+    }
+
+    protected void stop() throws Exception {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+        }
+        keepRunning = false;
+        // close the cursor if it's open, so if it is blocked on hasNext() it will return immediately
+        if (cursor != null) {
+            cursor.close();
+        }
+        // wait until the main loop acknowledges the stop
+        while (!stopped) { }
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+        }
+    }
+
+    /**
+     * The heart of the tailing process.
+     */
+    private void doRun() {
+        // while the cursor has more values, keepRunning is true and the cursorId is not 0, which symbolizes that the cursor is dead
+        try {
+            while (cursor.hasNext() && cursor.getCursorId() != 0  && keepRunning) {
+                DBObject dbObj = cursor.next();
+                Exchange exchange = endpoint.createMongoDbExchange(dbObj);
+                try {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.get("_id"));
+                    }
+                    consumer.getProcessor().process(exchange);
+                } catch (Exception e) {
+                    LOG.warn("Exception ocurred while processing exchange with ID " + exchange.getExchangeId(), e);
+                    if (exchange.getException() != e) {
+                        exchange.setException(e);
+                    }
+                }
+                tailTracking.setLastVal(dbObj);
+            }
+        } catch (CursorNotFound e) {
+            // we only log the warning if we are not stopping, otherwise it is expected because the stop() method kills the cursor just in case it is blocked
+            // waiting for more data to arrive
+            if (keepRunning) {
+                LOG.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e);
+            }
+        }
+
+        // the loop finished, persist the lastValue just in case we are shutting down
+        // TODO: perhaps add a functionality to persist every N records
+        tailTracking.persistToStore();
+    }
+
+    // no arguments, will ask DB what the last updated Id was (checking persistent storage)
+    private DBCursor initializeCursor() {
+        Object lastVal = tailTracking.lastVal;
+        // lastVal can be null if we are initializing and there is no persistence enabled
+        DBCursor answer;
+        if (lastVal == null) {
+            answer = dbCol.find().addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
+        } else {
+            DBObject queryObj = new BasicDBObject(tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", lastVal));
+            answer = dbCol.find(queryObj).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
+        }
+        return answer;
+    }
+}

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message