camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject [10/13] camel git commit: CAMEL-10554 - Camel Mongodb evolution to driver 3. Fixed CS
Date Sun, 18 Dec 2016 09:07:26 GMT
http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
index d1f04c2..cb33fe4 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
@@ -16,27 +16,6 @@
  */
 package org.apache.camel.component.mongodb3;
 
-import static com.mongodb.client.model.Filters.eq;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.BATCH_SIZE;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION_INDEX;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.DATABASE;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.CRITERIA;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.FIELDS_PROJECTION;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.LIMIT;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.MULTIUPDATE;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.NUM_TO_SKIP;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.OID;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.OPERATION_HEADER;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_AFFECTED;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_MATCHED;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_PAGE_SIZE;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_TOTAL_SIZE;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.SORT_BY;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.UPSERT;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.WRITERESULT;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -44,6 +23,15 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.mongodb.client.AggregateIterable;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.Processor;
@@ -57,510 +45,529 @@ import org.bson.conversions.Bson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.mongodb.client.AggregateIterable;
-import com.mongodb.client.FindIterable;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.Filters;
-import com.mongodb.client.model.UpdateOptions;
-import com.mongodb.client.result.DeleteResult;
-import com.mongodb.client.result.UpdateResult;
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.BATCH_SIZE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION_INDEX;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.CRITERIA;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.DATABASE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.FIELDS_PROJECTION;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.LIMIT;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MULTIUPDATE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.NUM_TO_SKIP;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.OID;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.OPERATION_HEADER;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_AFFECTED;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_MATCHED;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_PAGE_SIZE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_TOTAL_SIZE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.SORT_BY;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.UPSERT;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.WRITERESULT;
 
 /**
  * The MongoDb producer.
  */
 public class MongoDbProducer extends DefaultProducer {
-	private static final Logger LOG = LoggerFactory.getLogger(MongoDbProducer.class);
-	private final Map<MongoDbOperation, Processor> operations = new HashMap<>();
-	private MongoDbEndpoint endpoint;
-
-	{
-		bind(MongoDbOperation.aggregate, createDoAggregate());
-		bind(MongoDbOperation.command, createDoCommand());
-		bind(MongoDbOperation.count, createDoCount());
-		bind(MongoDbOperation.findAll, createDoFindAll());
-		bind(MongoDbOperation.findById, createDoFindById());
-		bind(MongoDbOperation.findOneByQuery, createDoFindOneByQuery());
-		bind(MongoDbOperation.getColStats, createDoGetColStats());
-		bind(MongoDbOperation.getDbStats, createDoGetDbStats());
-		bind(MongoDbOperation.insert, createDoInsert());
-		bind(MongoDbOperation.remove, createDoRemove());
-		bind(MongoDbOperation.save, createDoSave());
-		bind(MongoDbOperation.update, createDoUpdate());
-	}
-
-	public MongoDbProducer(MongoDbEndpoint endpoint) {
-		super(endpoint);
-		this.endpoint = endpoint;
-	}
-
-	public void process(Exchange exchange) throws Exception {
-		MongoDbOperation operation = endpoint.getOperation();
-		Object header = exchange.getIn().getHeader(OPERATION_HEADER);
-		if (header != null) {
-			LOG.debug("Overriding default operation with operation specified on header: {}", header);
-			try {
-				if (header instanceof MongoDbOperation) {
-					operation = ObjectHelper.cast(MongoDbOperation.class, header);
-				} else {
-					// evaluate as a String
-					operation = MongoDbOperation.valueOf(exchange.getIn().getHeader(OPERATION_HEADER, String.class));
-				}
-			} catch (Exception e) {
-				throw new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e);
-			}
-		}
-
-		try {
-			invokeOperation(operation, exchange);
-		} catch (Exception e) {
-			throw MongoDbComponent.wrapInCamelMongoDbException(e);
-		}
-
-	}
-
-	/**
-	 * Entry method that selects the appropriate MongoDB operation and executes it
-	 *
-	 * @param operation
-	 * @param exchange
-	 */
-	protected void invokeOperation(MongoDbOperation operation, Exchange exchange) throws Exception {
-		Processor processor = operations.get(operation);
-		if (processor != null) {
-			processor.process(exchange);
-		} else {
-			throw new CamelMongoDbException("Operation not supported. Value: " + operation);
-		}
-	}
-
-	private MongoDbProducer bind(MongoDbOperation operation, Function<Exchange, Object> mongoDbFunction) {
-		operations.put(operation, wrap(mongoDbFunction, operation));
-		return this;
-	}
-
-	// ----------- MongoDB operations ----------------
-
-	private Document createDbStatsCommand() {
-		return new Document("dbStats", 1).append("scale", 1);
-	}
-
-	private Document createCollStatsCommand(String collectionName) {
-		return new Document("collStats", collectionName);
-	}
-
-
-	// --------- Convenience methods -----------------------
-	private MongoDatabase calculateDb(Exchange exchange) {
-		// dynamic calculation is an option. In most cases it won't be used and we should not penalise all users with running this
-		// resolution logic on every Exchange if they won't be using this functionality at all
-		if (!endpoint.isDynamicity()) {
-			return endpoint.getMongoDatabase();
-		}
-
-		String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
-		MongoDatabase db;
-
-		if (dynamicDB == null) {
-			db = endpoint.getMongoDatabase();
-		} else {
-			db = endpoint.getMongoConnection().getDatabase(dynamicDB);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Dynamic database selected: {}", db.getName());
-		}
-		return db;
-	}
-
-	private String calculateCollectionName(Exchange exchange) {
-		if (!endpoint.isDynamicity()) {
-			return endpoint.getCollection();
-		}
-		String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
-		if (dynamicCollection == null) {
-			return endpoint.getCollection();
-		}
-		return dynamicCollection;
-	}
-
-	private MongoCollection<Document> calculateCollection(Exchange exchange) {
-		// dynamic calculation is an option. In most cases it won't be used and we should not penalise all users with running this
-		// resolution logic on every Exchange if they won't be using this functionality at all
-		if (!endpoint.isDynamicity()) {
-			return endpoint.getMongoCollection()
-					.withWriteConcern(endpoint.getWriteConcern());
-		}
-
-		String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
-		String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
-
-		@SuppressWarnings("unchecked")
-		List<Bson> dynamicIndex = exchange.getIn().getHeader(COLLECTION_INDEX, List.class);
-
-		MongoCollection<Document> dbCol;
-
-		if (dynamicDB == null && dynamicCollection == null) {
-			dbCol = endpoint.getMongoCollection()
-					.withWriteConcern(endpoint.getWriteConcern());
-		} else {
-			MongoDatabase db = calculateDb(exchange);
-
-			if (dynamicCollection == null) {
-				dbCol = db.getCollection(endpoint.getCollection(), Document.class);
-			} else {
-				dbCol = db.getCollection(dynamicCollection, Document.class);
-
-				// on the fly add index
-				if (dynamicIndex == null) {
-					endpoint.ensureIndex(dbCol, endpoint.createIndex());
-				} else {
-					endpoint.ensureIndex(dbCol, dynamicIndex);
-				}
-			}
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Dynamic database and/or collection selected: {}->{}", endpoint.getDatabase(), endpoint.getCollection());
-		}
-		return dbCol;
-	}
-
-	@SuppressWarnings("rawtypes")
-	private List<Document> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException {
-		List<Document> documentList = new ArrayList<>(insertList.size());
-		TypeConverter converter = exchange.getContext().getTypeConverter();
-		for (Object item : insertList) {
-			try {
-				Document document = converter.mandatoryConvertTo(Document.class, item);
-				documentList.add(document);
-			} catch (Exception e) {
-				throw new CamelMongoDbException("MongoDB operation = insert, Assuming List variant of MongoDB insert operation, but List contains non-Document items", e);
-			}
-		}
-		return documentList;
-	}
-
-	private boolean isWriteOperation(MongoDbOperation operation) {
-		return MongoDbComponent.WRITE_OPERATIONS.contains(operation);
-	}
-
-	private Processor wrap(Function<Exchange, Object> supplier, MongoDbOperation operation) {
-		return exchange -> {
-			Object result = supplier.apply(exchange);
-			copyHeaders(exchange);
-			moveBodyToOutIfResultIsReturnedAsHeader(exchange, operation);
-			processAndTransferResult(result, exchange, operation);
-		};
-	}
-
-	private void copyHeaders(Exchange exchange) {
-		MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
-	}
-
-	private void moveBodyToOutIfResultIsReturnedAsHeader(Exchange exchange, MongoDbOperation operation) {
-		if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
-			exchange.getOut().setBody(exchange.getIn().getBody());
-		}
-	}
-
-	private void processAndTransferResult(Object result, Exchange exchange, MongoDbOperation operation) {
-		// determine where to set the WriteResult: as the OUT body or as an IN message header
-		if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
-			exchange.getOut().setHeader(WRITERESULT, result);
-		} else {
-			exchange.getOut().setBody(result);
-		}
-	}
-
-	private Function<Exchange, Object> createDoGetColStats() {
-		return exch ->
-		calculateDb(exch).runCommand(createCollStatsCommand(calculateCollectionName(exch)));
-	}
-
-	private Function<Exchange, Object> createDoFindOneByQuery() {
-		return exch -> {
-			try {
-				MongoCollection<Document> dbCol = calculateCollection(exch);
-
-				Bson query = exch.getIn().getHeader(CRITERIA, Bson.class);
-				if(null == query) {
-					query = exch.getIn().getMandatoryBody(Bson.class);
-				}
-
-				Bson sortBy = exch.getIn().getHeader(SORT_BY, Bson.class);
-				Bson fieldFilter = exch.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
-
-				if (fieldFilter == null) {
-					fieldFilter = new Document();
-				}
-
-				if (sortBy == null) {
-					sortBy = new Document();
-				}
-
-				Document ret = dbCol.find(query).projection(fieldFilter).sort(sortBy).first();
-				exch.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
-				return ret;
-			} catch (InvalidPayloadException e) {
-				throw new CamelMongoDbException("Payload is no Document", e);
-			}
-		};
-	}
-
-	private Function<Exchange, Object> createDoCount() {
-		return exch -> {
-			Bson query = exch.getIn().getHeader(CRITERIA, Bson.class);
-			if (null==query) {
-				query = exch.getIn().getBody(Bson.class);
-			}
-			if (query == null) {
-				query = new Document();
-			}
-			return calculateCollection(exch).count(query);
-		};
-	}
-
-	private Function<Exchange, Object> createDoFindAll() {
-		return exchange1 -> {
-			Iterable<Document> result;
-			MongoCollection<Document> dbCol = calculateCollection(exchange1);
-			// do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection
-			Bson query = exchange1.getIn().getHeader(CRITERIA, Bson.class);
-			// do not run around looking for a type converter unless there is a need for it
-			if (null == query && exchange1.getIn().getBody() != null) {
-				query = exchange1.getIn().getBody(Bson.class);
-			}
-			Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
-
-			// get the batch size and number to skip
-			Integer batchSize = exchange1.getIn().getHeader(BATCH_SIZE, Integer.class);
-			Integer numToSkip = exchange1.getIn().getHeader(NUM_TO_SKIP, Integer.class);
-			Integer limit = exchange1.getIn().getHeader(LIMIT, Integer.class);
-			Document sortBy = exchange1.getIn().getHeader(SORT_BY, Document.class);
-			FindIterable<Document> ret;
-			if (query == null && fieldFilter == null) {
-				ret = dbCol.find(new Document());
-			} else if (fieldFilter == null) {
-				ret = dbCol.find(query);
-			} else if (query != null) {
-				ret = dbCol.find(query).projection(fieldFilter);
-			} else {
-				ret = dbCol.find(new Document()).projection(fieldFilter);
-			}
-
-			if (sortBy != null) {
-				ret.sort(sortBy);
-			}
-
-			if (batchSize != null) {
-				ret.batchSize(batchSize);
-			}
-
-			if (numToSkip != null) {
-				ret.skip(numToSkip);
-			}
-
-			if (limit != null) {
-				ret.limit(limit);
-			}
-
-			if (!MongoDbOutputType.MongoIterable.equals(endpoint.getOutputType())) {
-				try {
-					result = new ArrayList<>();
-					ret.iterator().forEachRemaining(((List<Document>) result)::add);
-					exchange1.getOut().setHeader(RESULT_PAGE_SIZE, ((List<Document>) result).size());
-				} finally {
-					ret.iterator().close();
-				}
-			} else {
-				result = ret;
-			}
-			return result;
-		};
-	}
-
-	private Function<Exchange, Object> createDoInsert() {
-		return exchange1 -> {
-			MongoCollection<Document> dbCol = calculateCollection(exchange1);
-			boolean singleInsert = true;
-			Object insert = exchange1.getIn().getBody(Document.class);
-			// body could not be converted to Document, check to see if it's of type List<Document>
-			if (insert == null) {
-				insert = exchange1.getIn().getBody(List.class);
-				// if the body of type List was obtained, ensure that all items are of type Document and cast the List to List<Document>
-				if (insert != null) {
-					singleInsert = false;
-					insert = attemptConvertToList((List<?>) insert, exchange1);
-				} else {
-					throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type Document nor List<Document>");
-				}
-			}
-
-			if (singleInsert) {
-				Document insertObject = Document.class.cast(insert);
-				dbCol.insertOne(insertObject);
-
-				exchange1.getIn().setHeader(OID, insertObject.get(MONGO_ID));
-			} else {
-				@SuppressWarnings("unchecked")
-				List<Document> insertObjects = (List<Document>) insert;
-				dbCol.insertMany(insertObjects);
-				List<Object> objectIdentification = new ArrayList<>(insertObjects.size());
-				objectIdentification.addAll(insertObjects.stream().map(insertObject -> insertObject.get(MONGO_ID)).collect(Collectors.toList()));
-				exchange1.getIn().setHeader(OID, objectIdentification);
-			}
-			return insert;
-		};
-	}
-
-	private Function<Exchange, Object> createDoUpdate() {
-		return exchange1 -> {
-			try {
-				MongoCollection<Document> dbCol = calculateCollection(exchange1);
-
-				Bson updateCriteria = exchange1.getIn().getHeader(CRITERIA, Bson.class);
-				Bson objNew;
-				if (null == updateCriteria){
-					@SuppressWarnings("unchecked")
-					List<Bson> saveObj = exchange1.getIn().getMandatoryBody((Class<List<Bson>>) Class.class.cast(List.class));
-					if (saveObj.size() != 2) {
-						throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of Document objects with size = 2");
-					}
-
-					updateCriteria = saveObj.get(0);
-					objNew = saveObj.get(1);
-				} else {
-					objNew = exchange1.getIn().getMandatoryBody(Bson.class);
-				}
-
-				Boolean multi = exchange1.getIn().getHeader(MULTIUPDATE, Boolean.class);
-				Boolean upsert = exchange1.getIn().getHeader(UPSERT, Boolean.class);
-
-				UpdateResult result;
-				UpdateOptions options = new UpdateOptions();
-				if (upsert != null) {
-					options.upsert(upsert);
-				}
-
-				if (multi == null || !multi) {
-					result = dbCol.updateOne(updateCriteria, objNew, options);
-				} else {
-					result = dbCol.updateMany(updateCriteria, objNew, options);
-				}
-				if (result.isModifiedCountAvailable()) {
-					exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getModifiedCount());
-				}
-				exchange1.getOut().setHeader(RECORDS_MATCHED, result.getMatchedCount());
-				return result;
-			} catch (InvalidPayloadException e) {
-				throw new CamelMongoDbException("Invalid payload for update", e);
-			}
-		};
-	}
-
-	private Function<Exchange, Object> createDoRemove() {
-		return exchange1 -> {
-			try {
-				MongoCollection<Document> dbCol = calculateCollection(exchange1);
-				Document removeObj = exchange1.getIn().getMandatoryBody(Document.class);
-
-				DeleteResult result = dbCol.deleteMany(removeObj);
-				if (result.wasAcknowledged()) {
-					exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getDeletedCount());
-				}
-				return result;
-			} catch (InvalidPayloadException e) {
-				throw new CamelMongoDbException("Invalid payload for remove", e);
-			}
-		};
-	}
-
-	private Function<Exchange, Object> createDoAggregate() {
-		return exchange1 -> {
-			try {
-				MongoCollection<Document> dbCol = calculateCollection(exchange1);
-
-				// Impossible with java driver to get the batch size and number to skip
-				List<Document> dbIterator = new ArrayList<>();
-				AggregateIterable<Document> aggregationResult;
-
-				@SuppressWarnings("unchecked")
-				List<Bson> query = exchange1.getIn().getMandatoryBody((Class<List<Bson>>) Class.class.cast(List.class));
-
-				// Allow body to be a pipeline
-				// @see http://docs.mongodb.org/manual/core/aggregation/
-				if (null != query) {
-					List<Bson> queryList = query.stream().map(o -> (Bson) o).collect(Collectors.toList());
-					aggregationResult = dbCol.aggregate(queryList);
-				} else {
-					List<Bson> queryList = new ArrayList<>();
-					queryList.add(Bson.class.cast(exchange1.getIn().getMandatoryBody(Bson.class)));
-					aggregationResult = dbCol.aggregate(queryList);
-				}
-				aggregationResult.iterator().forEachRemaining(dbIterator::add);
-				return dbIterator;
-			} catch (InvalidPayloadException e) {
-				throw new CamelMongoDbException("Invalid payload for aggregate", e);
-			}
-		};
-	}
-
-	private Function<Exchange, Object> createDoCommand() {
-		return exchange1 -> {
-			try {
-				MongoDatabase db = calculateDb(exchange1);
-				Document cmdObj = exchange1.getIn().getMandatoryBody(Document.class);
-				return db.runCommand(cmdObj);
-			} catch (InvalidPayloadException e) {
-				throw new CamelMongoDbException("Invalid payload for command", e);
-			}
-		};
-	}
-
-	private Function<Exchange, Object> createDoGetDbStats() {
-		return exchange1 -> calculateDb(exchange1).runCommand(createDbStatsCommand());
-	}
-
-	private Function<Exchange, Object> createDoFindById() {
-		return exchange1 -> {
-			try {
-				MongoCollection<Document> dbCol = calculateCollection(exchange1);
-				Object id = exchange1.getIn().getMandatoryBody();
-				Bson o = Filters.eq(MONGO_ID, id);
-				Document ret;
-
-				Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
-				if (fieldFilter == null) {
-					fieldFilter = new Document();
-				}
-				ret = dbCol.find(o).projection(fieldFilter).first();
-				exchange1.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
-				return ret;
-			} catch (InvalidPayloadException e) {
-				throw new CamelMongoDbException("Invalid payload for findById", e);
-			}
-		};
-	}
-
-	private Function<Exchange, Object> createDoSave() {
-		return exchange1 -> {
-			try {
-				MongoCollection<Document> dbCol = calculateCollection(exchange1);
-				Document saveObj = exchange1.getIn().getMandatoryBody(Document.class);
-				UpdateOptions options = new UpdateOptions().upsert(true);
-				UpdateResult result = null;
-				if (null == saveObj.get(MONGO_ID)) {
-					result = dbCol.replaceOne(Filters.where("false"), saveObj, options);
-					exchange1.getIn().setHeader(OID, result.getUpsertedId().asObjectId().getValue());
-				} else {
-					result = dbCol.replaceOne(eq(MONGO_ID, saveObj.get(MONGO_ID)), saveObj, options);
-					exchange1.getIn().setHeader(OID, saveObj.get(MONGO_ID));
-				}
-				return result;
-			} catch (InvalidPayloadException e) {
-				throw new CamelMongoDbException("Body incorrect type for save", e);
-			}
-		};
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDbProducer.class);
+    private final Map<MongoDbOperation, Processor> operations = new HashMap<>();
+    private MongoDbEndpoint endpoint;
+
+    {
+        bind(MongoDbOperation.aggregate, createDoAggregate());
+        bind(MongoDbOperation.command, createDoCommand());
+        bind(MongoDbOperation.count, createDoCount());
+        bind(MongoDbOperation.findAll, createDoFindAll());
+        bind(MongoDbOperation.findById, createDoFindById());
+        bind(MongoDbOperation.findOneByQuery, createDoFindOneByQuery());
+        bind(MongoDbOperation.getColStats, createDoGetColStats());
+        bind(MongoDbOperation.getDbStats, createDoGetDbStats());
+        bind(MongoDbOperation.insert, createDoInsert());
+        bind(MongoDbOperation.remove, createDoRemove());
+        bind(MongoDbOperation.save, createDoSave());
+        bind(MongoDbOperation.update, createDoUpdate());
+    }
+
+    public MongoDbProducer(MongoDbEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        MongoDbOperation operation = endpoint.getOperation();
+        Object header = exchange.getIn().getHeader(OPERATION_HEADER);
+        if (header != null) {
+            LOG.debug("Overriding default operation with operation specified on header: {}", header);
+            try {
+                if (header instanceof MongoDbOperation) {
+                    operation = ObjectHelper.cast(MongoDbOperation.class, header);
+                } else {
+                    // evaluate as a String
+                    operation = MongoDbOperation.valueOf(exchange.getIn().getHeader(OPERATION_HEADER, String.class));
+                }
+            } catch (Exception e) {
+                throw new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e);
+            }
+        }
+
+        try {
+            invokeOperation(operation, exchange);
+        } catch (Exception e) {
+            throw MongoDbComponent.wrapInCamelMongoDbException(e);
+        }
+
+    }
+
+    /**
+     * Entry method that selects the appropriate MongoDB operation and executes
+     * it
+     *
+     * @param operation
+     * @param exchange
+     */
+    protected void invokeOperation(MongoDbOperation operation, Exchange exchange) throws Exception {
+        Processor processor = operations.get(operation);
+        if (processor != null) {
+            processor.process(exchange);
+        } else {
+            throw new CamelMongoDbException("Operation not supported. Value: " + operation);
+        }
+    }
+
+    private MongoDbProducer bind(MongoDbOperation operation, Function<Exchange, Object> mongoDbFunction) {
+        operations.put(operation, wrap(mongoDbFunction, operation));
+        return this;
+    }
+
+    // ----------- MongoDB operations ----------------
+
+    private Document createDbStatsCommand() {
+        return new Document("dbStats", 1).append("scale", 1);
+    }
+
+    private Document createCollStatsCommand(String collectionName) {
+        return new Document("collStats", collectionName);
+    }
+
+    // --------- Convenience methods -----------------------
+    private MongoDatabase calculateDb(Exchange exchange) {
+        // dynamic calculation is an option. In most cases it won't be used and
+        // we should not penalise all users with running this
+        // resolution logic on every Exchange if they won't be using this
+        // functionality at all
+        if (!endpoint.isDynamicity()) {
+            return endpoint.getMongoDatabase();
+        }
+
+        String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
+        MongoDatabase db;
+
+        if (dynamicDB == null) {
+            db = endpoint.getMongoDatabase();
+        } else {
+            db = endpoint.getMongoConnection().getDatabase(dynamicDB);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Dynamic database selected: {}", db.getName());
+        }
+        return db;
+    }
+
+    private String calculateCollectionName(Exchange exchange) {
+        if (!endpoint.isDynamicity()) {
+            return endpoint.getCollection();
+        }
+        String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
+        if (dynamicCollection == null) {
+            return endpoint.getCollection();
+        }
+        return dynamicCollection;
+    }
+
+    private MongoCollection<Document> calculateCollection(Exchange exchange) {
+        // dynamic calculation is an option. In most cases it won't be used and
+        // we should not penalise all users with running this
+        // resolution logic on every Exchange if they won't be using this
+        // functionality at all
+        if (!endpoint.isDynamicity()) {
+            return endpoint.getMongoCollection().withWriteConcern(endpoint.getWriteConcern());
+        }
+
+        String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
+        String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
+
+        @SuppressWarnings("unchecked")
+        List<Bson> dynamicIndex = exchange.getIn().getHeader(COLLECTION_INDEX, List.class);
+
+        MongoCollection<Document> dbCol;
+
+        if (dynamicDB == null && dynamicCollection == null) {
+            dbCol = endpoint.getMongoCollection().withWriteConcern(endpoint.getWriteConcern());
+        } else {
+            MongoDatabase db = calculateDb(exchange);
+
+            if (dynamicCollection == null) {
+                dbCol = db.getCollection(endpoint.getCollection(), Document.class);
+            } else {
+                dbCol = db.getCollection(dynamicCollection, Document.class);
+
+                // on the fly add index
+                if (dynamicIndex == null) {
+                    endpoint.ensureIndex(dbCol, endpoint.createIndex());
+                } else {
+                    endpoint.ensureIndex(dbCol, dynamicIndex);
+                }
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Dynamic database and/or collection selected: {}->{}", endpoint.getDatabase(), endpoint.getCollection());
+        }
+        return dbCol;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private List<Document> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException {
+        List<Document> documentList = new ArrayList<>(insertList.size());
+        TypeConverter converter = exchange.getContext().getTypeConverter();
+        for (Object item : insertList) {
+            try {
+                Document document = converter.mandatoryConvertTo(Document.class, item);
+                documentList.add(document);
+            } catch (Exception e) {
+                throw new CamelMongoDbException("MongoDB operation = insert, Assuming List variant of MongoDB insert operation, but List contains non-Document items", e);
+            }
+        }
+        return documentList;
+    }
+
+    private boolean isWriteOperation(MongoDbOperation operation) {
+        return MongoDbComponent.WRITE_OPERATIONS.contains(operation);
+    }
+
+    private Processor wrap(Function<Exchange, Object> supplier, MongoDbOperation operation) {
+        return exchange -> {
+            Object result = supplier.apply(exchange);
+            copyHeaders(exchange);
+            moveBodyToOutIfResultIsReturnedAsHeader(exchange, operation);
+            processAndTransferResult(result, exchange, operation);
+        };
+    }
+
+    private void copyHeaders(Exchange exchange) {
+        MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
+    }
+
+    private void moveBodyToOutIfResultIsReturnedAsHeader(Exchange exchange, MongoDbOperation operation) {
+        if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
+            exchange.getOut().setBody(exchange.getIn().getBody());
+        }
+    }
+
+    private void processAndTransferResult(Object result, Exchange exchange, MongoDbOperation operation) {
+        // determine where to set the WriteResult: as the OUT body or as an IN
+        // message header
+        if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
+            exchange.getOut().setHeader(WRITERESULT, result);
+        } else {
+            exchange.getOut().setBody(result);
+        }
+    }
+
+    private Function<Exchange, Object> createDoGetColStats() {
+        return exch -> calculateDb(exch).runCommand(createCollStatsCommand(calculateCollectionName(exch)));
+    }
+
+    private Function<Exchange, Object> createDoFindOneByQuery() {
+        return exch -> {
+            try {
+                MongoCollection<Document> dbCol = calculateCollection(exch);
+
+                Bson query = exch.getIn().getHeader(CRITERIA, Bson.class);
+                if (null == query) {
+                    query = exch.getIn().getMandatoryBody(Bson.class);
+                }
+
+                Bson sortBy = exch.getIn().getHeader(SORT_BY, Bson.class);
+                Bson fieldFilter = exch.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
+
+                if (fieldFilter == null) {
+                    fieldFilter = new Document();
+                }
+
+                if (sortBy == null) {
+                    sortBy = new Document();
+                }
+
+                Document ret = dbCol.find(query).projection(fieldFilter).sort(sortBy).first();
+                exch.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+                return ret;
+            } catch (InvalidPayloadException e) {
+                throw new CamelMongoDbException("Payload is no Document", e);
+            }
+        };
+    }
+
+    private Function<Exchange, Object> createDoCount() {
+        return exch -> {
+            Bson query = exch.getIn().getHeader(CRITERIA, Bson.class);
+            if (null == query) {
+                query = exch.getIn().getBody(Bson.class);
+            }
+            if (query == null) {
+                query = new Document();
+            }
+            return calculateCollection(exch).count(query);
+        };
+    }
+
+    private Function<Exchange, Object> createDoFindAll() {
+        return exchange1 -> {
+            Iterable<Document> result;
+            MongoCollection<Document> dbCol = calculateCollection(exchange1);
+            // do not use getMandatoryBody, because if the body is empty we want
+            // to retrieve all objects in the collection
+            Bson query = exchange1.getIn().getHeader(CRITERIA, Bson.class);
+            // do not run around looking for a type converter unless there is a
+            // need for it
+            if (null == query && exchange1.getIn().getBody() != null) {
+                query = exchange1.getIn().getBody(Bson.class);
+            }
+            Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
+
+            // get the batch size and number to skip
+            Integer batchSize = exchange1.getIn().getHeader(BATCH_SIZE, Integer.class);
+            Integer numToSkip = exchange1.getIn().getHeader(NUM_TO_SKIP, Integer.class);
+            Integer limit = exchange1.getIn().getHeader(LIMIT, Integer.class);
+            Document sortBy = exchange1.getIn().getHeader(SORT_BY, Document.class);
+            FindIterable<Document> ret;
+            if (query == null && fieldFilter == null) {
+                ret = dbCol.find(new Document());
+            } else if (fieldFilter == null) {
+                ret = dbCol.find(query);
+            } else if (query != null) {
+                ret = dbCol.find(query).projection(fieldFilter);
+            } else {
+                ret = dbCol.find(new Document()).projection(fieldFilter);
+            }
+
+            if (sortBy != null) {
+                ret.sort(sortBy);
+            }
+
+            if (batchSize != null) {
+                ret.batchSize(batchSize);
+            }
+
+            if (numToSkip != null) {
+                ret.skip(numToSkip);
+            }
+
+            if (limit != null) {
+                ret.limit(limit);
+            }
+
+            if (!MongoDbOutputType.MongoIterable.equals(endpoint.getOutputType())) {
+                try {
+                    result = new ArrayList<>();
+                    ret.iterator().forEachRemaining(((List<Document>)result)::add);
+                    exchange1.getOut().setHeader(RESULT_PAGE_SIZE, ((List<Document>)result).size());
+                } finally {
+                    ret.iterator().close();
+                }
+            } else {
+                result = ret;
+            }
+            return result;
+        };
+    }
+
+    private Function<Exchange, Object> createDoInsert() {
+        return exchange1 -> {
+            MongoCollection<Document> dbCol = calculateCollection(exchange1);
+            boolean singleInsert = true;
+            Object insert = exchange1.getIn().getBody(Document.class);
+            // body could not be converted to Document, check to see if it's of
+            // type List<Document>
+            if (insert == null) {
+                insert = exchange1.getIn().getBody(List.class);
+                // if the body of type List was obtained, ensure that all items
+                // are of type Document and cast the List to List<Document>
+                if (insert != null) {
+                    singleInsert = false;
+                    insert = attemptConvertToList((List<?>)insert, exchange1);
+                } else {
+                    throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type Document nor List<Document>");
+                }
+            }
+
+            if (singleInsert) {
+                Document insertObject = Document.class.cast(insert);
+                dbCol.insertOne(insertObject);
+
+                exchange1.getIn().setHeader(OID, insertObject.get(MONGO_ID));
+            } else {
+                @SuppressWarnings("unchecked")
+                List<Document> insertObjects = (List<Document>)insert;
+                dbCol.insertMany(insertObjects);
+                List<Object> objectIdentification = new ArrayList<>(insertObjects.size());
+                objectIdentification.addAll(insertObjects.stream().map(insertObject -> insertObject.get(MONGO_ID)).collect(Collectors.toList()));
+                exchange1.getIn().setHeader(OID, objectIdentification);
+            }
+            return insert;
+        };
+    }
+
+    private Function<Exchange, Object> createDoUpdate() {
+        return exchange1 -> {
+            try {
+                MongoCollection<Document> dbCol = calculateCollection(exchange1);
+
+                Bson updateCriteria = exchange1.getIn().getHeader(CRITERIA, Bson.class);
+                Bson objNew;
+                if (null == updateCriteria) {
+                    @SuppressWarnings("unchecked")
+                    List<Bson> saveObj = exchange1.getIn().getMandatoryBody((Class<List<Bson>>)Class.class.cast(List.class));
+                    if (saveObj.size() != 2) {
+                        throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of Document objects with size = 2");
+                    }
+
+                    updateCriteria = saveObj.get(0);
+                    objNew = saveObj.get(1);
+                } else {
+                    objNew = exchange1.getIn().getMandatoryBody(Bson.class);
+                }
+
+                Boolean multi = exchange1.getIn().getHeader(MULTIUPDATE, Boolean.class);
+                Boolean upsert = exchange1.getIn().getHeader(UPSERT, Boolean.class);
+
+                UpdateResult result;
+                UpdateOptions options = new UpdateOptions();
+                if (upsert != null) {
+                    options.upsert(upsert);
+                }
+
+                if (multi == null || !multi) {
+                    result = dbCol.updateOne(updateCriteria, objNew, options);
+                } else {
+                    result = dbCol.updateMany(updateCriteria, objNew, options);
+                }
+                if (result.isModifiedCountAvailable()) {
+                    exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getModifiedCount());
+                }
+                exchange1.getOut().setHeader(RECORDS_MATCHED, result.getMatchedCount());
+                return result;
+            } catch (InvalidPayloadException e) {
+                throw new CamelMongoDbException("Invalid payload for update", e);
+            }
+        };
+    }
+
+    private Function<Exchange, Object> createDoRemove() {
+        return exchange1 -> {
+            try {
+                MongoCollection<Document> dbCol = calculateCollection(exchange1);
+                Document removeObj = exchange1.getIn().getMandatoryBody(Document.class);
+
+                DeleteResult result = dbCol.deleteMany(removeObj);
+                if (result.wasAcknowledged()) {
+                    exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getDeletedCount());
+                }
+                return result;
+            } catch (InvalidPayloadException e) {
+                throw new CamelMongoDbException("Invalid payload for remove", e);
+            }
+        };
+    }
+
+    private Function<Exchange, Object> createDoAggregate() {
+        return exchange1 -> {
+            try {
+                MongoCollection<Document> dbCol = calculateCollection(exchange1);
+
+                // Impossible with java driver to get the batch size and number
+                // to skip
+                List<Document> dbIterator = new ArrayList<>();
+                AggregateIterable<Document> aggregationResult;
+
+                @SuppressWarnings("unchecked")
+                List<Bson> query = exchange1.getIn().getMandatoryBody((Class<List<Bson>>)Class.class.cast(List.class));
+
+                // Allow body to be a pipeline
+                // @see http://docs.mongodb.org/manual/core/aggregation/
+                if (null != query) {
+                    List<Bson> queryList = query.stream().map(o -> (Bson)o).collect(Collectors.toList());
+                    aggregationResult = dbCol.aggregate(queryList);
+                } else {
+                    List<Bson> queryList = new ArrayList<>();
+                    queryList.add(Bson.class.cast(exchange1.getIn().getMandatoryBody(Bson.class)));
+                    aggregationResult = dbCol.aggregate(queryList);
+                }
+                aggregationResult.iterator().forEachRemaining(dbIterator::add);
+                return dbIterator;
+            } catch (InvalidPayloadException e) {
+                throw new CamelMongoDbException("Invalid payload for aggregate", e);
+            }
+        };
+    }
+
+    private Function<Exchange, Object> createDoCommand() {
+        return exchange1 -> {
+            try {
+                MongoDatabase db = calculateDb(exchange1);
+                Document cmdObj = exchange1.getIn().getMandatoryBody(Document.class);
+                return db.runCommand(cmdObj);
+            } catch (InvalidPayloadException e) {
+                throw new CamelMongoDbException("Invalid payload for command", e);
+            }
+        };
+    }
+
+    private Function<Exchange, Object> createDoGetDbStats() {
+        return exchange1 -> calculateDb(exchange1).runCommand(createDbStatsCommand());
+    }
+
+    private Function<Exchange, Object> createDoFindById() {
+        return exchange1 -> {
+            try {
+                MongoCollection<Document> dbCol = calculateCollection(exchange1);
+                Object id = exchange1.getIn().getMandatoryBody();
+                Bson o = Filters.eq(MONGO_ID, id);
+                Document ret;
+
+                Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
+                if (fieldFilter == null) {
+                    fieldFilter = new Document();
+                }
+                ret = dbCol.find(o).projection(fieldFilter).first();
+                exchange1.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+                return ret;
+            } catch (InvalidPayloadException e) {
+                throw new CamelMongoDbException("Invalid payload for findById", e);
+            }
+        };
+    }
+
+    private Function<Exchange, Object> createDoSave() {
+        return exchange1 -> {
+            try {
+                MongoCollection<Document> dbCol = calculateCollection(exchange1);
+                Document saveObj = exchange1.getIn().getMandatoryBody(Document.class);
+                UpdateOptions options = new UpdateOptions().upsert(true);
+                UpdateResult result = null;
+                if (null == saveObj.get(MONGO_ID)) {
+                    result = dbCol.replaceOne(Filters.where("false"), saveObj, options);
+                    exchange1.getIn().setHeader(OID, result.getUpsertedId().asObjectId().getValue());
+                } else {
+                    result = dbCol.replaceOne(eq(MONGO_ID, saveObj.get(MONGO_ID)), saveObj, options);
+                    exchange1.getIn().setHeader(OID, saveObj.get(MONGO_ID));
+                }
+                return result;
+            } catch (InvalidPayloadException e) {
+                throw new CamelMongoDbException("Body incorrect type for save", e);
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
index 4eba81f..5fba2b9 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
@@ -17,10 +17,10 @@
 package org.apache.camel.component.mongodb3;
 
 public class MongoDbTailTrackingConfig {
-    
+
     public static final String DEFAULT_COLLECTION = "camelTailTracking";
     public static final String DEFAULT_FIELD = "lastTrackingValue";
-    
+
     /**
      * See {@link MongoDbEndpoint#setTailTrackIncreasingField(String)}
      */
@@ -45,9 +45,9 @@ public class MongoDbTailTrackingConfig {
      * See {@link MongoDbEndpoint#setPersistentId(String)}
      */
     public final String persistentId;
-    
-    public MongoDbTailTrackingConfig(boolean persistentTailTracking, String tailTrackIncreasingField, String tailTrackDb,
-            String tailTrackCollection, String tailTrackField, String persistentId) {
+
+    public MongoDbTailTrackingConfig(boolean persistentTailTracking, String tailTrackIncreasingField, String tailTrackDb, String tailTrackCollection, String tailTrackField,
+                                     String persistentId) {
         this.increasingField = tailTrackIncreasingField;
         this.persistent = persistentTailTracking;
         this.db = tailTrackDb;
@@ -55,4 +55,4 @@ public class MongoDbTailTrackingConfig {
         this.collection = tailTrackCollection == null ? DEFAULT_COLLECTION : tailTrackCollection;
         this.field = tailTrackField == null ? DEFAULT_FIELD : tailTrackField;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
index e34643d..1c81d1e 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
@@ -16,38 +16,38 @@
  */
 package org.apache.camel.component.mongodb3;
 
-import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Updates;
 
 import org.bson.Document;
 import org.bson.conversions.Bson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.mongodb.MongoClient;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.model.Updates;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
 
 public class MongoDbTailTrackingManager {
 
-	private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailTrackingManager.class);
-    
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailTrackingManager.class);
+
     public Object lastVal;
 
     private final MongoClient connection;
     private final MongoDbTailTrackingConfig config;
     private MongoCollection<Document> dbCol;
     private Document trackingObj;
-    
+
     public MongoDbTailTrackingManager(MongoClient connection, MongoDbTailTrackingConfig config) {
         this.connection = connection;
         this.config = config;
     }
-    
+
     public void initialize() throws Exception {
         if (!config.persistent) {
             return;
         }
-        
+
         dbCol = connection.getDatabase(config.db).getCollection(config.collection, Document.class);
         Document filter = new Document("persistentId", config.persistentId);
         trackingObj = dbCol.find(filter).first();
@@ -55,46 +55,47 @@ public class MongoDbTailTrackingManager {
             dbCol.insertOne(filter);
             trackingObj = dbCol.find(filter).first();
         }
-        // keep only the _id, the rest is useless and causes more overhead during update
+        // keep only the _id, the rest is useless and causes more overhead
+        // during update
         trackingObj = new Document(MONGO_ID, trackingObj.get(MONGO_ID));
     }
-    
+
     public synchronized void persistToStore() {
         if (!config.persistent || lastVal == null) {
             return;
         }
-        
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Persisting lastVal={} to store, collection: {}", lastVal, config.collection);
         }
-        
+
         Bson updateObj = Updates.set(config.field, lastVal);
         dbCol.updateOne(trackingObj, updateObj);
         trackingObj = dbCol.find().first();
     }
-    
+
     public synchronized Object recoverFromStore() {
         if (!config.persistent) {
             return null;
         }
-        
+
         lastVal = dbCol.find(trackingObj).first().get(config.field);
-        
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Recovered lastVal={} from store, collection: {}", lastVal, config.collection);
         }
-        
+
         return lastVal;
     }
-    
+
     public void setLastVal(Document dbObj) {
         if (config.increasingField == null) {
             return;
         }
-        
+
         lastVal = dbObj.get(config.increasingField);
     }
-    
+
     public String getIncreasingFieldName() {
         return config.increasingField;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
index 69c8b7f..e0a2f9f 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
@@ -55,11 +55,11 @@ public class MongoDbTailableCursorConsumer extends DefaultConsumer {
         tailingProcess.initializeProcess();
         executor.execute(tailingProcess);
     }
-    
+
     protected MongoDbTailTrackingManager initTailTracking() throws Exception {
         MongoDbTailTrackingManager answer = new MongoDbTailTrackingManager(endpoint.getMongoConnection(), endpoint.getTailTrackingConfig());
         answer.initialize();
         return answer;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
index f51dd48..f5c9b69 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
@@ -14,213 +14,201 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.mongodb3;
 
-import static com.mongodb.client.model.Filters.gt;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
-
 import java.util.concurrent.CountDownLatch;
 
+import com.mongodb.CursorType;
+import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+
 import org.apache.camel.Exchange;
 import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.mongodb.CursorType;
-import com.mongodb.MongoCursorNotFoundException;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
+import static com.mongodb.client.model.Filters.gt;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
 
 public class MongoDbTailingProcess implements Runnable {
 
-	private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
-	private static final String CAPPED_KEY = "capped";
-
-	public volatile boolean keepRunning = true;
-	public volatile boolean stopped; // = false
-	private volatile CountDownLatch stoppedLatch;
-
-	private final MongoCollection<Document> dbCol;
-	private final MongoDbEndpoint endpoint;
-	private final MongoDbTailableCursorConsumer consumer;
-
-	// create local, final copies of these variables for increased performance
-	private final long cursorRegenerationDelay;
-	private final boolean cursorRegenerationDelayEnabled;
-
-	private MongoCursor<Document> cursor;
-	private MongoDbTailTrackingManager tailTracking;
-
-	public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer,
-			MongoDbTailTrackingManager tailTrack) {
-		this.endpoint = endpoint;
-		this.consumer = consumer;
-		this.dbCol = endpoint.getMongoCollection();
-		this.tailTracking = tailTrack;
-		this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
-		this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
-	}
-
-	public MongoCursor<Document> getCursor() {
-		return cursor;
-	}
-
-	/**
-	 * Initialise the tailing process, the cursor and if persistent tail
-	 * tracking is enabled, recover the cursor from the persisted point. As part
-	 * of the initialisation process, the component will validate that the
-	 * collection we are targeting is 'capped'.
-	 *
-	 * @throws Exception
-	 */
-	public void initializeProcess() throws Exception {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}",
-					"db: " + endpoint.getMongoDatabase() + ", col: " + endpoint.getCollection());
-		}
-
-		if (!isCollectionCapped()) {
-			throw new CamelMongoDbException(
-					"Tailable cursors are only compatible with capped collections, and collection "
-							+ endpoint.getCollection() + " is not capped");
-		}
-		try {
-			// recover the last value from the store if it exists
-			tailTracking.recoverFromStore();
-			cursor = initializeCursor();
-		} catch (Exception e) {
-			throw new CamelMongoDbException("Exception occurred while initializing tailable cursor", e);
-		}
-
-		if (cursor == null) {
-			throw new CamelMongoDbException(
-					"Tailable cursor was not initialized, or cursor returned is dead on arrival");
-		}
-
-	}
-
-	private Boolean isCollectionCapped() {
-		return endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
-	}
-
-	private Document createCollStatsCommand() {
-		return new Document("collStats", endpoint.getCollection());
-	}
-
-	/**
-	 * The heart of the tailing process.
-	 */
-	@Override
-	public void run() {
-		stoppedLatch = new CountDownLatch(1);
-		while (keepRunning) {
-			doRun();
-			// if the previous call didn't return because we have stopped
-			// running, then regenerate the cursor
-			if (keepRunning) {
-				cursor.close();
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", tailTracking.lastVal,
-							cursorRegenerationDelay);
-				}
-
-				if (cursorRegenerationDelayEnabled) {
-					try {
-						Thread.sleep(cursorRegenerationDelay);
-					} catch (InterruptedException e) {
-						// ignore
-					}
-				}
-
-				cursor = initializeCursor();
-			}
-		}
-
-		stopped = true;
-		stoppedLatch.countDown();
-	}
-
-	protected void stop() throws Exception {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}",
-					"db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
-		}
-		keepRunning = false;
-		// close the cursor if it's open, so if it is blocked on hasNext() it
-		// will return immediately
-		if (cursor != null) {
-			cursor.close();
-		}
-		awaitStopped();
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}",
-					"db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
-		}
-	}
-
-	/**
-	 * The heart of the tailing process.
-	 */
-	private void doRun() {
-		// while the cursor has more values, keepRunning is true and the
-		// cursorId is not 0, which symbolizes that the cursor is dead
-		try {
-			while (cursor.hasNext() && keepRunning) { // cursor.getCursorId() !=
-														// 0 &&
-				Document dbObj = cursor.next();
-				Exchange exchange = endpoint.createMongoDbExchange(dbObj);
-				try {
-					if (LOG.isTraceEnabled()) {
-						LOG.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.get(MONGO_ID));
-					}
-					consumer.getProcessor().process(exchange);
-				} catch (Exception e) {
-					// do nothing
-				}
-				tailTracking.setLastVal(dbObj);
-			}
-		} catch (MongoCursorNotFoundException e) {
-			// we only log the warning if we are not stopping, otherwise it is
-			// expected because the stop() method kills the cursor just in case
-			// it is blocked
-			// waiting for more data to arrive
-			if (keepRunning) {
-				LOG.debug(
-						"Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.",
-						e);
-			}
-		}
-
-		// the loop finished, persist the lastValue just in case we are shutting
-		// down
-		// TODO: perhaps add a functionality to persist every N records
-		tailTracking.persistToStore();
-	}
-
-	// no arguments, will ask DB what the last updated Id was (checking
-	// persistent storage)
-	private MongoCursor<Document> initializeCursor() {
-		Object lastVal = tailTracking.lastVal;
-		// lastVal can be null if we are initializing and there is no
-		// persistence enabled
-		MongoCursor<Document> answer;
-		if (lastVal == null) {
-			answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator();
-		} else {
-			try (MongoCursor<Document> iterator = 
-					dbCol.find(gt(tailTracking.getIncreasingFieldName(), lastVal)).cursorType(CursorType.TailableAwait).iterator();) {
-				answer = iterator;
-			}
-		}
-		return answer;
-	}
-
-	private void awaitStopped() throws InterruptedException {
-		if (!stopped) {
-			LOG.info("Going to wait for stopping");
-			stoppedLatch.await();
-		}
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
+    private static final String CAPPED_KEY = "capped";
+
+    public volatile boolean keepRunning = true;
+    public volatile boolean stopped; // = false
+    private volatile CountDownLatch stoppedLatch;
+
+    private final MongoCollection<Document> dbCol;
+    private final MongoDbEndpoint endpoint;
+    private final MongoDbTailableCursorConsumer consumer;
+
+    // create local, final copies of these variables for increased performance
+    private final long cursorRegenerationDelay;
+    private final boolean cursorRegenerationDelayEnabled;
+
+    private MongoCursor<Document> cursor;
+    private MongoDbTailTrackingManager tailTracking;
+
+    public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
+        this.endpoint = endpoint;
+        this.consumer = consumer;
+        this.dbCol = endpoint.getMongoCollection();
+        this.tailTracking = tailTrack;
+        this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
+        this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
+    }
+
+    public MongoCursor<Document> getCursor() {
+        return cursor;
+    }
+
+    /**
+     * Initialise the tailing process, the cursor and if persistent tail
+     * tracking is enabled, recover the cursor from the persisted point. As part
+     * of the initialisation process, the component will validate that the
+     * collection we are targeting is 'capped'.
+     *
+     * @throws Exception
+     */
+    public void initializeProcess() throws Exception {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + endpoint.getMongoDatabase() + ", col: " + endpoint.getCollection());
+        }
+
+        if (!isCollectionCapped()) {
+            throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + endpoint.getCollection() + " is not capped");
+        }
+        try {
+            // recover the last value from the store if it exists
+            tailTracking.recoverFromStore();
+            cursor = initializeCursor();
+        } catch (Exception e) {
+            throw new CamelMongoDbException("Exception occurred while initializing tailable cursor", e);
+        }
+
+        if (cursor == null) {
+            throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
+        }
+
+    }
+
+    private Boolean isCollectionCapped() {
+        return endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
+    }
+
+    private Document createCollStatsCommand() {
+        return new Document("collStats", endpoint.getCollection());
+    }
+
+    /**
+     * The heart of the tailing process.
+     */
+    @Override
+    public void run() {
+        stoppedLatch = new CountDownLatch(1);
+        while (keepRunning) {
+            doRun();
+            // if the previous call didn't return because we have stopped
+            // running, then regenerate the cursor
+            if (keepRunning) {
+                cursor.close();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", tailTracking.lastVal, cursorRegenerationDelay);
+                }
+
+                if (cursorRegenerationDelayEnabled) {
+                    try {
+                        Thread.sleep(cursorRegenerationDelay);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+
+                cursor = initializeCursor();
+            }
+        }
+
+        stopped = true;
+        stoppedLatch.countDown();
+    }
+
+    protected void stop() throws Exception {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
+        }
+        keepRunning = false;
+        // close the cursor if it's open, so if it is blocked on hasNext() it
+        // will return immediately
+        if (cursor != null) {
+            cursor.close();
+        }
+        awaitStopped();
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
+        }
+    }
+
+    /**
+     * The heart of the tailing process.
+     */
+    private void doRun() {
+        // while the cursor has more values, keepRunning is true and the
+        // cursorId is not 0, which symbolizes that the cursor is dead
+        try {
+            while (cursor.hasNext() && keepRunning) { // cursor.getCursorId() !=
+                                                      // 0 &&
+                Document dbObj = cursor.next();
+                Exchange exchange = endpoint.createMongoDbExchange(dbObj);
+                try {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.get(MONGO_ID));
+                    }
+                    consumer.getProcessor().process(exchange);
+                } catch (Exception e) {
+                    // do nothing
+                }
+                tailTracking.setLastVal(dbObj);
+            }
+        } catch (MongoCursorNotFoundException e) {
+            // we only log the warning if we are not stopping, otherwise it is
+            // expected because the stop() method kills the cursor just in case
+            // it is blocked
+            // waiting for more data to arrive
+            if (keepRunning) {
+                LOG.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e);
+            }
+        }
+
+        // the loop finished, persist the lastValue just in case we are shutting
+        // down
+        // TODO: perhaps add a functionality to persist every N records
+        tailTracking.persistToStore();
+    }
+
+    // no arguments, will ask DB what the last updated Id was (checking
+    // persistent storage)
+    private MongoCursor<Document> initializeCursor() {
+        Object lastVal = tailTracking.lastVal;
+        // lastVal can be null if we are initializing and there is no
+        // persistence enabled
+        MongoCursor<Document> answer;
+        if (lastVal == null) {
+            answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator();
+        } else {
+            try (MongoCursor<Document> iterator = dbCol.find(gt(tailTracking.getIncreasingFieldName(), lastVal)).cursorType(CursorType.TailableAwait).iterator();) {
+                answer = iterator;
+            }
+        }
+        return answer;
+    }
+
+    private void awaitStopped() throws InterruptedException {
+        if (!stopped) {
+            LOG.info("Going to wait for stopping");
+            stoppedLatch.await();
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
index 920d11a..0bf2074 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
@@ -25,6 +25,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
 import org.apache.camel.converter.IOConverter;
@@ -45,119 +47,116 @@ import org.bson.json.JsonReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 @Converter
 public final class MongoDbBasicConverters {
 
-	private static final Logger LOG = LoggerFactory.getLogger(MongoDbBasicConverters.class);
-
-	// Jackson's ObjectMapper is thread-safe, so no need to create a pool nor synchronize access to it
-	private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
-	private MongoDbBasicConverters() {
-	}
-
-	@Converter
-	public static Document fromMapToDocument(Map<String, Object> map) {
-		return new Document(map);
-	}
-
-	@Converter
-	public static Map<String, Object> fromDocumentToMap(Document document) {
-		return document;
-	}
-
-	@Converter
-	public static Document fromStringToDocument(String s) {
-		Document answer = null;
-		try {
-			answer = Document.parse(s);
-		} catch (Exception e) {
-			LOG.warn("String -> Document conversion selected, but the following exception occurred. Returning null.", e);
-		}
-
-		return answer;
-	}
-
-	@Converter
-	public static Document fromFileToDocument(File f, Exchange exchange) throws FileNotFoundException {
-		return fromInputStreamToDocument(new FileInputStream(f), exchange);
-	}
-
-	@Converter
-	public static Document fromInputStreamToDocument(InputStream is, Exchange exchange) {
-		Document answer = null;
-		try {
-			byte[] input = IOConverter.toBytes(is);
-
-			if (isBson(input)) {
-				JsonReader reader = new JsonReader(new String(input));
-				DocumentCodec documentReader = new DocumentCodec();
-
-				answer = documentReader.decode(reader, DecoderContext.builder().build());
-			} else {
-				answer = Document.parse(IOConverter.toString(input, exchange));
-			}
-		} catch (Exception e) {
-			LOG.warn("String -> Document conversion selected, but the following exception occurred. Returning null.", e);
-		} finally {
-			// we need to make sure to close the input stream
-			IOHelper.close(is, "InputStream", LOG);
-		}
-		return answer;
-	}
-
-	/** 
-	 * If the input starts with any number of whitespace characters and then a '{' character, we
-	 * assume it is JSON rather than BSON. There are probably no useful BSON blobs that fit this pattern
-	 */
-	private static boolean isBson(byte[] input) {
-		int i = 0;
-		while (i < input.length) {
-			if (input[i] == '{') {
-				return false;
-			} else if (!Character.isWhitespace(input[i])) {
-				return true;
-			}
-		}
-		return true;
-	}
-
-	@Converter
-	public static Document fromAnyObjectToDocument(Object value) {
-		Document answer;
-		try {
-			@SuppressWarnings("unchecked")
-			Map<String, Object> m = OBJECT_MAPPER.convertValue(value, Map.class);
-			answer = new Document(m);
-		} catch (Exception e) {
-			LOG.warn("Conversion has fallen back to generic Object -> Document, but unable to convert type {}. Returning null. {}",
-					value.getClass().getCanonicalName(), e.getClass().getCanonicalName() + ": " + e.getMessage());
-			return null;
-		}
-		return answer;
-	}
-
-	@Converter
-	public static List<Bson> fromStringToList(String value) {
-
-		final CodecRegistry codecRegistry = CodecRegistries.fromProviders(Arrays.asList(
-				new ValueCodecProvider(),
-				new BsonValueCodecProvider(),
-				new DocumentCodecProvider()));
-
-		JsonReader reader = new JsonReader(value);
-		BsonArrayCodec arrayReader = new BsonArrayCodec(codecRegistry);
-
-		BsonArray docArray = arrayReader.decode(reader, DecoderContext.builder().build());
-		
-		List<Bson> answer = new ArrayList<>(docArray.size());
-
-		for (BsonValue doc : docArray) {
-			answer.add(doc.asDocument());
-		}
-		return answer;
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDbBasicConverters.class);
+
+    // Jackson's ObjectMapper is thread-safe, so no need to create a pool nor
+    // synchronize access to it
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private MongoDbBasicConverters() {
+    }
+
+    @Converter
+    public static Document fromMapToDocument(Map<String, Object> map) {
+        return new Document(map);
+    }
+
+    @Converter
+    public static Map<String, Object> fromDocumentToMap(Document document) {
+        return document;
+    }
+
+    @Converter
+    public static Document fromStringToDocument(String s) {
+        Document answer = null;
+        try {
+            answer = Document.parse(s);
+        } catch (Exception e) {
+            LOG.warn("String -> Document conversion selected, but the following exception occurred. Returning null.", e);
+        }
+
+        return answer;
+    }
+
+    @Converter
+    public static Document fromFileToDocument(File f, Exchange exchange) throws FileNotFoundException {
+        return fromInputStreamToDocument(new FileInputStream(f), exchange);
+    }
+
+    @Converter
+    public static Document fromInputStreamToDocument(InputStream is, Exchange exchange) {
+        Document answer = null;
+        try {
+            byte[] input = IOConverter.toBytes(is);
+
+            if (isBson(input)) {
+                JsonReader reader = new JsonReader(new String(input));
+                DocumentCodec documentReader = new DocumentCodec();
+
+                answer = documentReader.decode(reader, DecoderContext.builder().build());
+            } else {
+                answer = Document.parse(IOConverter.toString(input, exchange));
+            }
+        } catch (Exception e) {
+            LOG.warn("String -> Document conversion selected, but the following exception occurred. Returning null.", e);
+        } finally {
+            // we need to make sure to close the input stream
+            IOHelper.close(is, "InputStream", LOG);
+        }
+        return answer;
+    }
+
+    /**
+     * If the input starts with any number of whitespace characters and then a
+     * '{' character, we assume it is JSON rather than BSON. There are probably
+     * no useful BSON blobs that fit this pattern
+     */
+    private static boolean isBson(byte[] input) {
+        int i = 0;
+        while (i < input.length) {
+            if (input[i] == '{') {
+                return false;
+            } else if (!Character.isWhitespace(input[i])) {
+                return true;
+            }
+        }
+        return true;
+    }
+
+    @Converter
+    public static Document fromAnyObjectToDocument(Object value) {
+        Document answer;
+        try {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> m = OBJECT_MAPPER.convertValue(value, Map.class);
+            answer = new Document(m);
+        } catch (Exception e) {
+            LOG.warn("Conversion has fallen back to generic Object -> Document, but unable to convert type {}. Returning null. {}", value.getClass().getCanonicalName(),
+                     e.getClass().getCanonicalName() + ": " + e.getMessage());
+            return null;
+        }
+        return answer;
+    }
+
+    @Converter
+    public static List<Bson> fromStringToList(String value) {
+
+        final CodecRegistry codecRegistry = CodecRegistries.fromProviders(Arrays.asList(new ValueCodecProvider(), new BsonValueCodecProvider(), new DocumentCodecProvider()));
+
+        JsonReader reader = new JsonReader(value);
+        BsonArrayCodec arrayReader = new BsonArrayCodec(codecRegistry);
+
+        BsonArray docArray = arrayReader.decode(reader, DecoderContext.builder().build());
+
+        List<Bson> answer = new ArrayList<>(docArray.size());
+
+        for (BsonValue doc : docArray) {
+            answer.add(doc.asDocument());
+        }
+        return answer;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
index 40042b5..f1e9c2c 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
@@ -16,8 +16,10 @@
  */
 package org.apache.camel.component.mongodb3.processor.idempotent;
 
-import static com.mongodb.client.model.Filters.eq;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+import com.mongodb.ErrorCategory;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.result.DeleteResult;
 
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
@@ -27,14 +29,12 @@ import org.apache.camel.util.ObjectHelper;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
-import com.mongodb.ErrorCategory;
-import com.mongodb.MongoClient;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.result.DeleteResult;
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
 
 @ManagedResource(description = "Mongo db based message id repository")
 public class MongoDbIdempotentRepository<E> extends ServiceSupport implements IdempotentRepository<E> {
-	private MongoClient mongoClient;
+    private MongoClient mongoClient;
     private String collectionName;
     private String dbName;
     private MongoCollection<Document> collection;
@@ -68,7 +68,7 @@ public class MongoDbIdempotentRepository<E> extends ServiceSupport implements Id
     @Override
     public boolean contains(E key) {
         Bson document = eq(MONGO_ID, key);
-        long count =  collection.count(document);
+        long count = collection.count(document);
         return count > 0;
     }
 
@@ -77,7 +77,7 @@ public class MongoDbIdempotentRepository<E> extends ServiceSupport implements Id
     public boolean remove(E key) {
         Bson document = eq(MONGO_ID, key);
         DeleteResult res = collection.deleteOne(document);
-        return  res.getDeletedCount() > 0;
+        return res.getDeletedCount() > 0;
     }
 
     @Override
@@ -131,4 +131,3 @@ public class MongoDbIdempotentRepository<E> extends ServiceSupport implements Id
         this.dbName = dbName;
     }
 }
-

http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
index 52bdd6d..dde6e1f 100644
--- a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
@@ -18,9 +18,12 @@ package org.apache.camel.component.mongodb3;
 
 import java.util.Formatter;
 
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExecutionException;
-import org.apache.camel.component.mongodb3.CamelMongoDbException;
 import org.apache.camel.component.properties.PropertiesComponent;
 import org.apache.camel.spring.SpringCamelContext;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -30,19 +33,13 @@ import org.bson.Document;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.AnnotationConfigApplicationContext;
 
-import com.mongodb.MongoClient;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-
-
-
 public abstract class AbstractMongoDbTest extends CamelTestSupport {
 
     protected static MongoClient mongo;
     protected static MongoDatabase db;
     protected static MongoCollection<Document> testCollection;
     protected static MongoCollection<Document> dynamicCollection;
-    
+
     protected static String dbName = "test";
     protected static String testCollectionName;
     protected static String dynamicCollectionName;
@@ -54,13 +51,14 @@ public abstract class AbstractMongoDbTest extends CamelTestSupport {
         mongo = applicationContext.getBean("myDb", MongoClient.class);
         db = mongo.getDatabase(dbName);
 
-        // Refresh the test collection - drop it and recreate it. We don't do this for the database because MongoDB would create large
+        // Refresh the test collection - drop it and recreate it. We don't do
+        // this for the database because MongoDB would create large
         // store files each time
         testCollectionName = "camelTest";
         testCollection = db.getCollection(testCollectionName, Document.class);
         testCollection.drop();
         testCollection = db.getCollection(testCollectionName, Document.class);
-        
+
         dynamicCollectionName = testCollectionName.concat("Dynamic");
         dynamicCollection = db.getCollection(dynamicCollectionName, Document.class);
         dynamicCollection.drop();
@@ -80,7 +78,7 @@ public abstract class AbstractMongoDbTest extends CamelTestSupport {
     protected CamelContext createCamelContext() throws Exception {
         applicationContext = new AnnotationConfigApplicationContext(EmbedMongoConfiguration.class);
         @SuppressWarnings("deprecation")
-		CamelContext ctx = SpringCamelContext.springCamelContext(applicationContext);
+        CamelContext ctx = SpringCamelContext.springCamelContext(applicationContext);
         PropertiesComponent pc = new PropertiesComponent("classpath:mongodb.test.properties");
         ctx.addComponent("properties", pc);
         return ctx;
@@ -102,7 +100,7 @@ public abstract class AbstractMongoDbTest extends CamelTestSupport {
     protected CamelMongoDbException extractAndAssertCamelMongoDbException(Object result, String message) {
         assertTrue("Result is not an Exception", result instanceof Throwable);
         assertTrue("Result is not an CamelExecutionException", result instanceof CamelExecutionException);
-        Throwable exc = ((CamelExecutionException) result).getCause();
+        Throwable exc = ((CamelExecutionException)result).getCause();
         assertTrue("Result is not an CamelMongoDbException", exc instanceof CamelMongoDbException);
         CamelMongoDbException camelExc = ObjectHelper.cast(CamelMongoDbException.class, exc);
         if (message != null) {
@@ -111,4 +109,4 @@ public abstract class AbstractMongoDbTest extends CamelTestSupport {
         return camelExc;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
index 03b34c3..50c119c 100644
--- a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
@@ -16,17 +16,9 @@
  */
 package org.apache.camel.component.mongodb3;
 
-import static com.mongodb.MongoClientOptions.builder;
-import static de.flapdoodle.embed.mongo.distribution.Version.Main.PRODUCTION;
-import static de.flapdoodle.embed.process.runtime.Network.localhostIsIPv6;
-import static org.springframework.util.SocketUtils.findAvailableTcpPort;
-
 import java.io.IOException;
 import java.net.UnknownHostException;
 
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.ReadPreference;
@@ -38,6 +30,14 @@ import de.flapdoodle.embed.mongo.config.IMongodConfig;
 import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
 import de.flapdoodle.embed.mongo.config.Net;
 
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import static com.mongodb.MongoClientOptions.builder;
+import static de.flapdoodle.embed.mongo.distribution.Version.Main.PRODUCTION;
+import static de.flapdoodle.embed.process.runtime.Network.localhostIsIPv6;
+import static org.springframework.util.SocketUtils.findAvailableTcpPort;
+
 @Configuration
 public class EmbedMongoConfiguration {
 
@@ -45,10 +45,7 @@ public class EmbedMongoConfiguration {
 
     static {
         try {
-            IMongodConfig mongodConfig = new MongodConfigBuilder()
-                    .version(PRODUCTION)
-                    .net(new Net(PORT, localhostIsIPv6()))
-                    .build();
+            IMongodConfig mongodConfig = new MongodConfigBuilder().version(PRODUCTION).net(new Net(PORT, localhostIsIPv6())).build();
             MongodExecutable mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
             mongodExecutable.start();
         } catch (IOException e) {
@@ -63,37 +60,37 @@ public class EmbedMongoConfiguration {
 
     @Bean
     public MongoClient myDbP() throws UnknownHostException {
-		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.primary()));
-		return new MongoClient(uri);
+        MongoClientURI uri = new MongoClientURI("mongodb://localhost:" + PORT, builder().readPreference(ReadPreference.primary()));
+        return new MongoClient(uri);
     }
 
     @Bean
     public MongoClient myDbPP() throws UnknownHostException {
-		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.primaryPreferred()));
-		return new MongoClient(uri);
+        MongoClientURI uri = new MongoClientURI("mongodb://localhost:" + PORT, builder().readPreference(ReadPreference.primaryPreferred()));
+        return new MongoClient(uri);
     }
 
     @Bean
     public MongoClient myDbS() throws UnknownHostException {
-		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.secondary()));
-		return new MongoClient(uri);
+        MongoClientURI uri = new MongoClientURI("mongodb://localhost:" + PORT, builder().readPreference(ReadPreference.secondary()));
+        return new MongoClient(uri);
     }
 
     @Bean
     public MongoClient myDbWCA() throws UnknownHostException {
-		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().writeConcern(WriteConcern.ACKNOWLEDGED));
-		return new MongoClient(uri);
+        MongoClientURI uri = new MongoClientURI("mongodb://localhost:" + PORT, builder().writeConcern(WriteConcern.ACKNOWLEDGED));
+        return new MongoClient(uri);
     }
 
     @Bean
     public MongoClient myDbSP() throws UnknownHostException {
-        MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.secondaryPreferred()));
-		return new MongoClient(uri);
+        MongoClientURI uri = new MongoClientURI("mongodb://localhost:" + PORT, builder().readPreference(ReadPreference.secondaryPreferred()));
+        return new MongoClient(uri);
     }
 
     @Bean
     public MongoClient myDbN() throws UnknownHostException {
-		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.nearest()));
-		return new MongoClient(uri);
+        MongoClientURI uri = new MongoClientURI("mongodb://localhost:" + PORT, builder().readPreference(ReadPreference.nearest()));
+        return new MongoClient(uri);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5715fce7/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
index 33b7eeb..a5b94f4 100644
--- a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
@@ -24,4 +24,4 @@ import org.springframework.context.annotation.ImportResource;
 @Import(EmbedMongoConfiguration.class)
 @ImportResource("org/apache/camel/component/mongodb3/mongoBasicOperationsTest.xml")
 public class MongoBasicOperationsConfiguration {
-}
\ No newline at end of file
+}


Mime
View raw message