drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [2/2] drill git commit: DRILL-4242: Updates to storage-mongo
Date Mon, 04 Jan 2016 02:17:45 GMT
DRILL-4242: Updates to storage-mongo

+ Depends on the latest Mongo Java Driver (3.2.0)
+ Uses MongoDatabase rather than the deprecated DB class
+ Uses Document and Bson types rather than the legacy DBObject type

This closes #304.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e4372f22
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e4372f22
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e4372f22

Branch: refs/heads/master
Commit: e4372f224a4b474494388356355a53808092a67a
Parents: 5c7a992
Author: Ross Lawley <ross.lawley@gmail.com>
Authored: Thu Dec 17 13:46:40 2015 +0000
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sun Jan 3 15:31:13 2016 -0800

----------------------------------------------------------------------
 contrib/storage-mongo/pom.xml                   |  6 +--
 .../exec/store/mongo/MongoFilterBuilder.java    | 12 ++---
 .../exec/store/mongo/MongoRecordReader.java     | 30 ++++++-----
 .../drill/exec/store/mongo/MongoScanSpec.java   |  9 ++--
 .../drill/exec/store/mongo/MongoSubScan.java    | 10 ++--
 .../drill/exec/store/mongo/MongoUtils.java      | 30 +++++------
 .../exec/store/mongo/config/MongoPStore.java    | 56 ++++++++++----------
 .../store/mongo/config/MongoPStoreProvider.java | 23 ++++----
 .../drill/exec/store/mongo/MongoTestSuit.java   |  4 +-
 9 files changed, 90 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e4372f22/contrib/storage-mongo/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml
index 4ef9263..87e08ef 100644
--- a/contrib/storage-mongo/pom.xml
+++ b/contrib/storage-mongo/pom.xml
@@ -31,7 +31,7 @@
   <properties>
      <mongo.TestSuite>**/MongoTestSuit.class</mongo.TestSuite>
   </properties>
-  
+
   <dependencies>
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
@@ -43,7 +43,7 @@
   <dependency>
     <groupId>org.mongodb</groupId>
     <artifactId>mongo-java-driver</artifactId>
-    <version>3.1.0</version>
+    <version>3.2.0</version>
   </dependency>
 
     <!-- Test dependencie -->
@@ -94,5 +94,5 @@
       </plugin>
     </plugins>
   </build>
-  
+
 </project>

http://git-wip-us.apache.org/repos/asf/drill/blob/e4372f22/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
index def793a..379f449 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java
@@ -26,11 +26,11 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
 import org.apache.drill.exec.store.mongo.common.MongoCompareOp;
+import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
-import com.mongodb.BasicDBObject;
 
 public class MongoFilterBuilder extends
     AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements
@@ -58,7 +58,7 @@ public class MongoFilterBuilder extends
 
   private MongoScanSpec mergeScanSpecs(String functionName,
       MongoScanSpec leftScanSpec, MongoScanSpec rightScanSpec) {
-    BasicDBObject newFilter = null;
+    Document newFilter = new Document();
 
     switch (functionName) {
     case "booleanAnd":
@@ -199,15 +199,15 @@ public class MongoFilterBuilder extends
     }
 
     if (compareOp != null) {
-      BasicDBObject queryFilter = new BasicDBObject();
+      Document queryFilter = new Document();
       if (compareOp == MongoCompareOp.IFNULL) {
         queryFilter.put(fieldName,
-            new BasicDBObject(MongoCompareOp.EQUAL.getCompareOp(), null));
+            new Document(MongoCompareOp.EQUAL.getCompareOp(), null));
       } else if (compareOp == MongoCompareOp.IFNOTNULL) {
         queryFilter.put(fieldName,
-            new BasicDBObject(MongoCompareOp.NOT_EQUAL.getCompareOp(), null));
+            new Document(MongoCompareOp.NOT_EQUAL.getCompareOp(), null));
       } else {
-        queryFilter.put(fieldName, new BasicDBObject(compareOp.getCompareOp(),
+        queryFilter.put(fieldName, new Document(compareOp.getCompareOp(),
             fieldValue));
       }
       return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan

http://git-wip-us.apache.org/repos/asf/drill/blob/e4372f22/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 064ec31..8e50c02 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.bson.BsonDocument;
 import org.bson.BsonDocumentReader;
+import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,6 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.mongodb.BasicDBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.ServerAddress;
 import com.mongodb.client.MongoCollection;
@@ -63,8 +63,8 @@ public class MongoRecordReader extends AbstractRecordReader {
   private BsonRecordReader bsonReader;
   private VectorContainerWriter writer;
 
-  private BasicDBObject filters;
-  private final BasicDBObject fields;
+  private Document filters;
+  private final Document fields;
 
   private final FragmentContext fragmentContext;
   private OperatorContext operatorContext;
@@ -79,15 +79,16 @@ public class MongoRecordReader extends AbstractRecordReader {
   public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaPath>
projectedColumns,
       FragmentContext context, MongoStoragePlugin plugin) {
 
-    fields = new BasicDBObject();
+    fields = new Document();
     // exclude _id field, if not mentioned by user.
     fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
     setColumns(projectedColumns);
     fragmentContext = context;
     this.plugin = plugin;
-    filters = new BasicDBObject();
-    Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters(subScanSpec.getMinFilters(),
-        subScanSpec.getMaxFilters());
+    filters = new Document();
+    Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
+        subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+
     buildFilters(subScanSpec.getFilter(), mergedFilters);
     enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
     readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
@@ -113,18 +114,19 @@ public class MongoRecordReader extends AbstractRecordReader {
     return transformed;
   }
 
-  private void buildFilters(BasicDBObject pushdownFilters, Map<String, List<BasicDBObject>>
mergedFilters) {
-    for (Entry<String, List<BasicDBObject>> entry : mergedFilters.entrySet())
{
-      List<BasicDBObject> list = entry.getValue();
+  private void buildFilters(Document pushdownFilters,
+      Map<String, List<Document>> mergedFilters) {
+    for (Entry<String, List<Document>> entry : mergedFilters.entrySet()) {
+      List<Document> list = entry.getValue();
       if (list.size() == 1) {
-        this.filters.putAll(list.get(0).toMap());
+        this.filters.putAll(list.get(0));
       } else {
-        BasicDBObject andQueryFilter = new BasicDBObject();
+        Document andQueryFilter = new Document();
         andQueryFilter.put("$and", list);
-        this.filters.putAll(andQueryFilter.toMap());
+        this.filters.putAll(andQueryFilter);
       }
     }
-    if (pushdownFilters != null && !pushdownFilters.toMap().isEmpty()) {
+    if (pushdownFilters != null && !pushdownFilters.isEmpty()) {
       if (!mergedFilters.isEmpty()) {
         this.filters = MongoUtils.andFilterAtIndex(this.filters, pushdownFilters);
       } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/e4372f22/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
index d380207..20b9b6e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -17,15 +17,16 @@
  */
 package org.apache.drill.exec.store.mongo;
 
+import org.bson.Document;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.mongodb.BasicDBObject;
 
 public class MongoScanSpec {
   private String dbName;
   private String collectionName;
 
-  private BasicDBObject filters;
+  private Document filters;
 
   @JsonCreator
   public MongoScanSpec(@JsonProperty("dbName") String dbName,
@@ -35,7 +36,7 @@ public class MongoScanSpec {
   }
 
   public MongoScanSpec(String dbName, String collectionName,
-      BasicDBObject filters) {
+      Document filters) {
     this.dbName = dbName;
     this.collectionName = collectionName;
     this.filters = filters;
@@ -49,7 +50,7 @@ public class MongoScanSpec {
     return collectionName;
   }
 
-  public BasicDBObject getFilters() {
+  public Document getFilters() {
     return filters;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e4372f22/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index f3b7d4f..962ea76 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +41,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
-import com.mongodb.BasicDBObject;
 
 @JsonTypeName("mongo-shard-read")
 public class MongoSubScan extends AbstractBase implements SubScan {
@@ -130,7 +130,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     protected Map<String, Object> minFilters;
     protected Map<String, Object> maxFilters;
 
-    protected BasicDBObject filter;
+    protected Document filter;
 
     @JsonCreator
     public MongoSubScanSpec(@JsonProperty("dbName") String dbName,
@@ -138,7 +138,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
         @JsonProperty("hosts") List<String> hosts,
         @JsonProperty("minFilters") Map<String, Object> minFilters,
         @JsonProperty("maxFilters") Map<String, Object> maxFilters,
-        @JsonProperty("filters") BasicDBObject filters) {
+        @JsonProperty("filters") Document filters) {
       this.dbName = dbName;
       this.collectionName = collectionName;
       this.hosts = hosts;
@@ -196,11 +196,11 @@ public class MongoSubScan extends AbstractBase implements SubScan {
       return this;
     }
 
-    public BasicDBObject getFilter() {
+    public Document getFilter() {
       return filter;
     }
 
-    public MongoSubScanSpec setFilter(BasicDBObject filter) {
+    public MongoSubScanSpec setFilter(Document filter) {
       this.filter = filter;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e4372f22/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
index a5fb2ad..3b48f86 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java
@@ -24,51 +24,51 @@ import java.util.Map.Entry;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.mongodb.BasicDBObject;
+import org.bson.Document;
 
 public class MongoUtils {
 
-  public static BasicDBObject andFilterAtIndex(BasicDBObject leftFilter,
-      BasicDBObject rightFilter) {
-    BasicDBObject andQueryFilter = new BasicDBObject();
-    List<BasicDBObject> filters = new ArrayList<BasicDBObject>();
+  public static Document andFilterAtIndex(Document leftFilter,
+      Document rightFilter) {
+    Document andQueryFilter = new Document();
+    List<Document> filters = new ArrayList<Document>();
     filters.add(leftFilter);
     filters.add(rightFilter);
     andQueryFilter.put("$and", filters);
     return andQueryFilter;
   }
 
-  public static BasicDBObject orFilterAtIndex(BasicDBObject leftFilter,
-      BasicDBObject rightFilter) {
-    BasicDBObject orQueryFilter = new BasicDBObject();
-    List<BasicDBObject> filters = new ArrayList<BasicDBObject>();
+  public static Document orFilterAtIndex(Document leftFilter,
+       Document rightFilter) {
+    Document orQueryFilter = new Document();
+    List<Document> filters = new ArrayList<Document>();
     filters.add(leftFilter);
     filters.add(rightFilter);
     orQueryFilter.put("$or", filters);
     return orQueryFilter;
   }
 
-  public static Map<String, List<BasicDBObject>> mergeFilters(
+  public static Map<String, List<Document>> mergeFilters(
       Map<String, Object> minFilters, Map<String, Object> maxFilters) {
-    Map<String, List<BasicDBObject>> filters = Maps.newHashMap();
+    Map<String, List<Document>> filters = Maps.newHashMap();
 
     for (Entry<String, Object> entry : minFilters.entrySet()) {
-      List<BasicDBObject> list = filters.get(entry.getKey());
+      List<Document> list = filters.get(entry.getKey());
       if (list == null) {
         list = Lists.newArrayList();
         filters.put(entry.getKey(), list);
       }
-      list.add(new BasicDBObject(entry.getKey(), new BasicDBObject("$gte",
+      list.add(new Document(entry.getKey(), new Document("$gte",
           entry.getValue())));
     }
 
     for (Entry<String, Object> entry : maxFilters.entrySet()) {
-      List<BasicDBObject> list = filters.get(entry.getKey());
+      List<Document> list = filters.get(entry.getKey());
       if (list == null) {
         list = Lists.newArrayList();
         filters.put(entry.getKey(), list);
       }
-      list.add(new BasicDBObject(entry.getKey(), new BasicDBObject("$lt", entry
+      list.add(new Document(entry.getKey(), new Document("$lt", entry
           .getValue())));
     }
     return filters;

http://git-wip-us.apache.org/repos/asf/drill/blob/e4372f22/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
index ea3a5a2..7883fc9 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.store.mongo.config;
 
-import static org.apache.drill.exec.store.mongo.config.MongoPStoreProvider.pKey;
-
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map.Entry;
@@ -28,14 +26,19 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.store.mongo.DrillMongoConstants;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.bson.Document;
+import org.bson.conversions.Bson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.mongodb.BasicDBObject;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
-import com.mongodb.WriteResult;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.Updates;
+import com.mongodb.client.result.UpdateResult;
+
+import static org.apache.drill.exec.store.mongo.config.MongoPStoreProvider.pKey;
 
 public class MongoPStore<V> implements PStore<V>, DrillMongoConstants {
 
@@ -43,9 +46,9 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants
{
 
   private final PStoreConfig<V> config;
 
-  private final DBCollection collection;
+  private final MongoCollection<Document> collection;
 
-  public MongoPStore(PStoreConfig<V> config, DBCollection collection)
+  public MongoPStore(PStoreConfig<V> config, MongoCollection<Document> collection)
       throws IOException {
 //    this.config = config;
 //    this.collection = collection;
@@ -55,10 +58,10 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants
{
   @Override
   public V get(String key) {
     try {
-      DBObject get = new BasicDBObject().append(ID, key);
-      DBCursor cursor = collection.find(get);
-      if (cursor != null && cursor.hasNext()) {
-        return value((byte[]) cursor.next().get(pKey));
+      Bson query = Filters.eq(ID, key);
+      Document document = collection.find(query).first();
+      if (document != null) {
+        return value((byte[]) document.get(pKey));
       } else {
         return null;
       }
@@ -71,10 +74,8 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants
{
   @Override
   public void put(String key, V value) {
     try {
-      DBObject putObj = new BasicDBObject(2);
-      putObj.put(ID, key);
-      putObj.put(pKey, bytes(value));
-      collection.insert(putObj);
+      Document putObj = new Document(ID, key).append(pKey, bytes(value));
+      collection.insertOne(putObj);
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
       throw new DrillRuntimeException(e.getMessage(), e);
@@ -84,11 +85,10 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants
{
   @Override
   public boolean putIfAbsent(String key, V value) {
     try {
-      DBObject check = new BasicDBObject(1).append(ID, key);
-      DBObject putObj = new BasicDBObject(2);
-      putObj.put(pKey, bytes(value));
-      WriteResult wr = collection.update(check, putObj, true, false);
-      return wr.getN() == 1 ? true : false;
+      Bson query = Filters.eq(ID, key);
+      Bson update = Updates.set(pKey, bytes(value));
+      UpdateResult updateResult = collection.updateOne(query, update, new UpdateOptions().upsert(true));
+      return updateResult.getModifiedCount() == 1;
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
       throw new DrillRuntimeException(e.getMessage(), e);
@@ -98,8 +98,8 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants
{
   @Override
   public void delete(String key) {
     try {
-      DBObject delete = new BasicDBObject(1).append(ID, key);
-      collection.remove(delete);
+      Bson query = Filters.eq(ID, key);
+      collection.deleteOne(query);
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
       throw new DrillRuntimeException(e.getMessage(), e);
@@ -129,10 +129,10 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants
{
 
   private class MongoIterator implements Iterator<Entry<String, V>> {
 
-    private DBCursor cursor;
+    private MongoCursor<Document> cursor;
 
     public MongoIterator() {
-      cursor = collection.find();
+      cursor = collection.find().iterator();
     }
 
     @Override
@@ -156,9 +156,9 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants
{
 
   private class DeferredEntry implements Entry<String, V> {
 
-    private DBObject result;
+    private Document result;
 
-    public DeferredEntry(DBObject result) {
+    public DeferredEntry(Document result) {
       this.result = result;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e4372f22/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
index 7443c2e..ae9353a 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
@@ -18,24 +18,22 @@
 package org.apache.drill.exec.store.mongo.config;
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.store.mongo.DrillMongoConstants;
-import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreRegistry;
+import org.apache.drill.exec.store.sys.local.LocalEStoreProvider;
+import org.bson.Document;
+import org.bson.conversions.Bson;
 
-import com.mongodb.BasicDBObject;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.WriteConcern;
-import org.apache.drill.exec.store.sys.local.LocalEStoreProvider;
-import org.apache.drill.exec.store.sys.local.MapEStore;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Indexes;
 
 public class MongoPStoreProvider implements PStoreProvider, DrillMongoConstants {
 
@@ -46,7 +44,7 @@ public class MongoPStoreProvider implements PStoreProvider, DrillMongoConstants
 
   private MongoClient client;
 
-  private DBCollection collection;
+  private MongoCollection<Document> collection;
 
   private final String mongoURL;
   private final LocalEStoreProvider localEStoreProvider;
@@ -60,10 +58,9 @@ public class MongoPStoreProvider implements PStoreProvider, DrillMongoConstants
   public void start() throws IOException {
     MongoClientURI clientURI = new MongoClientURI(mongoURL);
     client = new MongoClient(clientURI);
-    DB db = client.getDB(clientURI.getDatabase());
-    collection = db.getCollection(clientURI.getCollection());
-    collection.setWriteConcern(WriteConcern.JOURNALED);
-    DBObject index = new BasicDBObject(1).append(pKey, Integer.valueOf(1));
+    MongoDatabase db = client.getDatabase(clientURI.getDatabase());
+    collection = db.getCollection(clientURI.getCollection()).withWriteConcern(WriteConcern.JOURNALED);
+    Bson index = Indexes.ascending(pKey);
     collection.createIndex(index);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e4372f22/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
index 896d4cf..74ff7d0 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
@@ -39,7 +39,7 @@ import com.mongodb.ServerAddress;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.IndexOptions;
-
+import com.mongodb.client.model.Indexes;
 import de.flapdoodle.embed.mongo.Command;
 import de.flapdoodle.embed.mongo.MongodExecutable;
 import de.flapdoodle.embed.mongo.MongodProcess;
@@ -226,7 +226,7 @@ public class MongoTestSuit implements MongoTestConstants {
     }
     IndexOptions indexOptions = new IndexOptions().unique(true)
         .background(false).name(indexFieldName);
-    Bson keys = new Document(indexFieldName, Integer.valueOf(1));
+    Bson keys = Indexes.ascending(indexFieldName);
     mongoCollection.createIndex(keys, indexOptions);
   }
 


Mime
View raw message