drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/2] drill git commit: DRILL-3478: Implement BSON reader and use that for MongoDB by default
Date Mon, 04 Jan 2016 02:16:32 GMT
Repository: drill
Updated Branches:
  refs/heads/DRILL-3478 [created] e4372f224


DRILL-3478: Implement BSON reader and use that for MongoDB by default


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5c7a992d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5c7a992d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5c7a992d

Branch: refs/heads/DRILL-3478
Commit: 5c7a992d2a271ab38c2cdb79f145b5f9d59592b7
Parents: 76f41e1
Author: akumarb2010 <akumarb2010@gmail.com>
Authored: Sun Dec 13 20:24:05 2015 +0530
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sun Jan 3 15:00:43 2016 -0800

----------------------------------------------------------------------
 .../exec/store/mongo/MongoRecordReader.java     |  70 ++--
 .../drill/exec/store/mongo/MongoTestBase.java   |  18 +-
 exec/java-exec/pom.xml                          |   5 +
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../drill/exec/store/avro/AvroRecordReader.java |  18 +-
 .../drill/exec/store/avro/MapOrListWriter.java  | 114 ------
 .../drill/exec/store/bson/BsonRecordReader.java | 378 +++++++++++++++++++
 .../exec/store/bson/TestBsonRecordReader.java   | 266 +++++++++++++
 .../src/main/codegen/templates/BaseWriter.java  |  17 +
 .../complex/impl/MapOrListWriterImpl.java       | 112 ++++++
 11 files changed, 844 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 4aa9aaa..064ec31 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -33,10 +33,12 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.bson.BsonRecordReader;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.bson.Document;
+import org.bson.BsonDocument;
+import org.bson.BsonDocumentReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,10 +56,11 @@ import com.mongodb.client.MongoDatabase;
 public class MongoRecordReader extends AbstractRecordReader {
   private static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
 
-  private MongoCollection<Document> collection;
-  private MongoCursor<Document> cursor;
+  private MongoCollection<BsonDocument> collection;
+  private MongoCursor<BsonDocument> cursor;
 
   private JsonReader jsonReader;
+  private BsonRecordReader bsonReader;
   private VectorContainerWriter writer;
 
   private BasicDBObject filters;
@@ -71,12 +74,10 @@ public class MongoRecordReader extends AbstractRecordReader {
   private final boolean enableAllTextMode;
   private final boolean readNumbersAsDouble;
   private boolean unionEnabled;
+  private final boolean isBsonRecordReader;
 
-  public MongoRecordReader(
-      MongoSubScan.MongoSubScanSpec subScanSpec,
-      List<SchemaPath> projectedColumns,
-      FragmentContext context,
-      MongoStoragePlugin plugin) {
+  public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
+      FragmentContext context, MongoStoragePlugin plugin) {
 
     fields = new BasicDBObject();
     // exclude _id field, if not mentioned by user.
@@ -85,11 +86,13 @@ public class MongoRecordReader extends AbstractRecordReader {
     fragmentContext = context;
     this.plugin = plugin;
     filters = new BasicDBObject();
-    Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters(
-        subScanSpec.getMinFilters(), subScanSpec.getMaxFilters());
+    Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters(subScanSpec.getMinFilters(),
+        subScanSpec.getMaxFilters());
     buildFilters(subScanSpec.getFilter(), mergedFilters);
     enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
     readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+    isBsonRecordReader = fragmentContext.getOptions().getOption(ExecConstants.MONGO_BSON_RECORD_READER).bool_val;
+    logger.debug("BsonRecordReader is enabled? " + isBsonRecordReader);
     init(subScanSpec);
   }
 
@@ -97,7 +100,7 @@ public class MongoRecordReader extends AbstractRecordReader {
   protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
     Set<SchemaPath> transformed = Sets.newLinkedHashSet();
     if (!isStarQuery()) {
-      for (SchemaPath column : projectedColumns ) {
+      for (SchemaPath column : projectedColumns) {
         String fieldName = column.getRootSegment().getPath();
         transformed.add(column);
         this.fields.put(fieldName, Integer.valueOf(1));
@@ -110,8 +113,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     return transformed;
   }
 
-  private void buildFilters(BasicDBObject pushdownFilters,
-      Map<String, List<BasicDBObject>> mergedFilters) {
+  private void buildFilters(BasicDBObject pushdownFilters, Map<String, List<BasicDBObject>> mergedFilters) {
     for (Entry<String, List<BasicDBObject>> entry : mergedFilters.entrySet()) {
       List<BasicDBObject> list = entry.getValue();
       if (list.size() == 1) {
@@ -124,8 +126,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     }
     if (pushdownFilters != null && !pushdownFilters.toMap().isEmpty()) {
       if (!mergedFilters.isEmpty()) {
-        this.filters = MongoUtils.andFilterAtIndex(this.filters,
-            pushdownFilters);
+        this.filters = MongoUtils.andFilterAtIndex(this.filters, pushdownFilters);
       } else {
         this.filters = pushdownFilters;
       }
@@ -140,27 +141,35 @@ public class MongoRecordReader extends AbstractRecordReader {
     }
     MongoClient client = plugin.getClient(addresses);
     MongoDatabase db = client.getDatabase(subScanSpec.getDbName());
-    collection = db.getCollection(subScanSpec.getCollectionName());
     this.unionEnabled = fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+    collection = db.getCollection(subScanSpec.getCollectionName(), BsonDocument.class);
   }
 
   @Override
   public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     this.operatorContext = context;
     this.writer = new VectorContainerWriter(output, unionEnabled);
-    this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false, readNumbersAsDouble);
-
+    // Default is BsonReader and all text mode will not be honored in
+    // BsonRecordReader
+    if (isBsonRecordReader) {
+      this.bsonReader = new BsonRecordReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()),
+          readNumbersAsDouble);
+      logger.debug("Initialized BsonRecordReader. ");
+    } else {
+      this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()),
+          enableAllTextMode, false, readNumbersAsDouble);
+      logger.debug(" Intialized JsonRecordReader. ");
+    }
   }
 
   @Override
   public int next() {
-    if(cursor == null){
+    if (cursor == null) {
       logger.info("Filters Applied : " + filters);
       logger.info("Fields Selected :" + fields);
       cursor = collection.find(filters).projection(fields).batchSize(100).iterator();
     }
 
-
     writer.allocate();
     writer.reset();
 
@@ -171,17 +180,25 @@ public class MongoRecordReader extends AbstractRecordReader {
     try {
       while (docCount < BaseValueVector.INITIAL_VALUE_ALLOCATION && cursor.hasNext()) {
         writer.setPosition(docCount);
-        String doc = cursor.next().toJson();
-        jsonReader.setSource(doc.getBytes(Charsets.UTF_8));
-        jsonReader.write(writer);
+        if (isBsonRecordReader) {
+          BsonDocument bsonDocument = cursor.next();
+          bsonReader.write(writer, new BsonDocumentReader(bsonDocument));
+        } else {
+          String doc = cursor.next().toJson();
+          jsonReader.setSource(doc.getBytes(Charsets.UTF_8));
+          jsonReader.write(writer);
+        }
         docCount++;
       }
 
-      jsonReader.ensureAtLeastOneField(writer);
+      if (isBsonRecordReader) {
+        bsonReader.ensureAtLeastOneField(writer);
+      } else {
+        jsonReader.ensureAtLeastOneField(writer);
+      }
 
       writer.setValueCount(docCount);
-      logger.debug("Took {} ms to get {} records",
-          watch.elapsed(TimeUnit.MILLISECONDS), docCount);
+      logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), docCount);
       return docCount;
     } catch (IOException e) {
       String msg = "Failure while reading document. - Parser was at record: " + (docCount + 1);
@@ -194,5 +211,4 @@ public class MongoRecordReader extends AbstractRecordReader {
   public void close() {
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
index cf3ca61..1877a64 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.List;
 
 import org.apache.drill.PlanTestBase;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -39,15 +39,17 @@ public class MongoTestBase extends PlanTestBase implements MongoTestConstants {
     initMongoStoragePlugin();
   }
 
-  public static void initMongoStoragePlugin() throws ExecutionSetupException {
-    final StoragePluginRegistry pluginRegistry = getDrillbitContext()
-        .getStorage();
-    storagePlugin = (MongoStoragePlugin) pluginRegistry
-        .getPlugin(MongoStoragePluginConfig.NAME);
+  public static void initMongoStoragePlugin() throws Exception {
+    final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    storagePlugin = (MongoStoragePlugin) pluginRegistry.getPlugin(MongoStoragePluginConfig.NAME);
     storagePluginConfig = storagePlugin.getConfig();
     storagePluginConfig.setEnabled(true);
-    pluginRegistry.createOrUpdate(MongoStoragePluginConfig.NAME,
-        storagePluginConfig, true);
+    pluginRegistry.createOrUpdate(MongoStoragePluginConfig.NAME, storagePluginConfig, true);
+    if (System.getProperty("drill.mongo.tests.bson.reader", "true").equalsIgnoreCase("false")) {
+      testNoResult(String.format("alter session set `%s` = false", ExecConstants.MONGO_BSON_RECORD_READER));
+    } else {
+      testNoResult(String.format("alter session set `%s` = true", ExecConstants.MONGO_BSON_RECORD_READER));
+    }
   }
 
   public List<QueryDataBatch> runMongoSQLWithResults(String sql)

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index efb549c..7d5e4b3 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -119,6 +119,11 @@
       <version>2.4.3</version>
     </dependency>
     <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongo-java-driver</artifactId>
+      <version>3.0.2</version>
+    </dependency>
+    <dependency>
       <groupId>com.fasterxml.jackson.module</groupId>
       <artifactId>jackson-module-afterburner</artifactId>
       <version>2.4.0</version>

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 44330f0..0198da8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -153,6 +153,8 @@ public interface ExecConstants {
   OptionValidator MONGO_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(MONGO_ALL_TEXT_MODE, false);
   String MONGO_READER_READ_NUMBERS_AS_DOUBLE = "store.mongo.read_numbers_as_double";
   OptionValidator MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(MONGO_READER_READ_NUMBERS_AS_DOUBLE, false);
+  String MONGO_BSON_RECORD_READER = "store.mongo.bson.record.reader";
+  OptionValidator MONGO_BSON_RECORD_READER_VALIDATOR = new BooleanValidator(MONGO_BSON_RECORD_READER, true);
 
   BooleanValidator ENABLE_UNION_TYPE = new BooleanValidator("exec.enable_union_type", false);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 6843a21..e54b914 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -101,6 +101,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR,
       ExecConstants.MONGO_READER_ALL_TEXT_MODE_VALIDATOR,
       ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR,
+      ExecConstants.MONGO_BSON_RECORD_READER_VALIDATOR,
       ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR,
       ExecConstants.SLICE_TARGET_OPTION,
       ExecConstants.AFFINITY_FACTOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index c9cd505..1405aa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -47,12 +47,14 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Stopwatch;
+
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -166,14 +168,14 @@ public class AvroRecordReader extends AbstractRecordReader {
 
     switch (type) {
       case RECORD:
-        process(container, schema, null, new MapOrListWriter(writer.rootAsMap()));
+        process(container, schema, null, new MapOrListWriterImpl(writer.rootAsMap()));
         break;
       default:
         throw new DrillRuntimeException("Root object must be record type. Found: " + type);
     }
   }
 
-  private void process(final Object value, final Schema schema, final String fieldName, MapOrListWriter writer) {
+  private void process(final Object value, final Schema schema, final String fieldName, MapOrListWriterImpl writer) {
     if (value == null) {
       return;
     }
@@ -182,14 +184,14 @@ public class AvroRecordReader extends AbstractRecordReader {
     switch (type) {
       case RECORD:
         // list field of MapOrListWriter will be non null when we want to store array of maps/records.
-        MapOrListWriter _writer = writer;
+        MapOrListWriterImpl _writer = writer;
 
         for (final Schema.Field field : schema.getFields()) {
           if (field.schema().getType() == Schema.Type.RECORD ||
               (field.schema().getType() == Schema.Type.UNION &&
               field.schema().getTypes().get(0).getType() == Schema.Type.NULL &&
               field.schema().getTypes().get(1).getType() == Schema.Type.RECORD)) {
-            _writer = writer.map(field.name());
+              _writer = (MapOrListWriterImpl) writer.map(field.name());
           }
 
           process(((GenericRecord) value).get(field.name()), field.schema(), field.name(), _writer);
@@ -201,9 +203,9 @@ public class AvroRecordReader extends AbstractRecordReader {
         Schema elementSchema = array.getSchema().getElementType();
         Type elementType = elementSchema.getType();
         if (elementType == Schema.Type.RECORD || elementType == Schema.Type.MAP){
-          writer = writer.list(fieldName).listoftmap(fieldName);
+          writer = (MapOrListWriterImpl) writer.list(fieldName).listoftmap(fieldName);
         } else {
-          writer = writer.list(fieldName);
+          writer = (MapOrListWriterImpl) writer.list(fieldName);
         }
         writer.start();
         for (final Object o : array) {
@@ -222,7 +224,7 @@ public class AvroRecordReader extends AbstractRecordReader {
         @SuppressWarnings("unchecked")
         final HashMap<Object, Object> map = (HashMap<Object, Object>) value;
         Schema valueSchema = schema.getValueType();
-        writer = writer.map(fieldName);
+        writer = (MapOrListWriterImpl) writer.map(fieldName);
         writer.start();
         for (Entry<Object, Object> entry : map.entrySet()) {
           process(entry.getValue(), valueSchema, entry.getKey().toString(), writer);
@@ -256,7 +258,7 @@ public class AvroRecordReader extends AbstractRecordReader {
   }
 
   private void processPrimitive(final Object value, final Schema.Type type, final String fieldName,
-                                final MapOrListWriter writer) {
+                                final MapOrListWriterImpl writer) {
     if (value == null) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
deleted file mode 100644
index e74021b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
-import org.apache.drill.exec.vector.complex.writer.BitWriter;
-import org.apache.drill.exec.vector.complex.writer.Float4Writer;
-import org.apache.drill.exec.vector.complex.writer.Float8Writer;
-import org.apache.drill.exec.vector.complex.writer.IntWriter;
-import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
-import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
-
-/**
- * Impersonates a map writer or a list writer depending on construction type.
- * Perhaps this is a tragic misuse of polymorphism?
- */
-public class MapOrListWriter {
-
-  final BaseWriter.MapWriter map;
-  final BaseWriter.ListWriter list;
-
-  MapOrListWriter(final BaseWriter.MapWriter writer) {
-    this.map = writer;
-    this.list = null;
-  }
-
-  MapOrListWriter(final BaseWriter.ListWriter writer) {
-    this.map = null;
-    this.list = writer;
-  }
-
-  void start() {
-    if (map != null) {
-      map.start();
-    } else {
-      list.startList();
-    }
-  }
-
-  void end() {
-    if (map != null) {
-      map.end();
-    } else {
-      list.endList();
-    }
-  }
-
-  MapOrListWriter map(final String name) {
-    assert map != null;
-    return new MapOrListWriter(map.map(name));
-  }
-
-  MapOrListWriter listoftmap(final String name) {
-    assert list != null;
-    return new MapOrListWriter(list.map());
-  }
-
-  MapOrListWriter list(final String name) {
-    assert map != null;
-    return new MapOrListWriter(map.list(name));
-  }
-
-  boolean isMapWriter() {
-    return map != null;
-  }
-
-  boolean isListWriter() {
-    return list != null;
-  }
-
-  VarCharWriter varChar(final String name) {
-    return (map != null) ? map.varChar(name) : list.varChar();
-  }
-
-  IntWriter integer(final String name) {
-    return (map != null) ? map.integer(name) : list.integer();
-  }
-
-  BigIntWriter bigInt(final String name) {
-    return (map != null) ? map.bigInt(name) : list.bigInt();
-  }
-
-  Float4Writer float4(final String name) {
-    return (map != null) ? map.float4(name) : list.float4();
-  }
-
-  Float8Writer float8(final String name) {
-    return (map != null) ? map.float8(name) : list.float8();
-  }
-
-  BitWriter bit(final String name) {
-    return (map != null) ? map.bit(name) : list.bit();
-  }
-
-  VarBinaryWriter binary(final String name) {
-    return (map != null) ? map.varBinary(name) : list.varBinary();
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
new file mode 100644
index 0000000..36462b6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.bson;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.apache.drill.exec.vector.complex.writer.DateWriter;
+import org.apache.drill.exec.vector.complex.writer.TimeWriter;
+import org.bson.BsonBinary;
+import org.bson.BsonReader;
+import org.bson.BsonType;
+import org.joda.time.DateTime;
+
+import com.google.common.base.Preconditions;
+
+public class BsonRecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BsonRecordReader.class);
+  public final static int MAX_RECORD_SIZE = 128 * 1024;
+  private final List<SchemaPath> columns;
+  private boolean atLeastOneWrite = false;
+  private final boolean readNumbersAsDouble;
+  protected DrillBuf workBuf;
+  private String currentFieldName;
+
+  public BsonRecordReader(DrillBuf managedBuf, boolean allTextMode, boolean readNumbersAsDouble) {
+    this(managedBuf, GroupScan.ALL_COLUMNS, readNumbersAsDouble);
+  }
+
+  public BsonRecordReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean readNumbersAsDouble) {
+    assert Preconditions.checkNotNull(columns).size() > 0 : "bson record reader requires at least a column";
+    this.readNumbersAsDouble = readNumbersAsDouble;
+    this.workBuf = managedBuf;
+    this.columns = columns;
+  }
+
+  public void write(ComplexWriter writer, BsonReader reader) throws IOException {
+    reader.readStartDocument();
+    BsonType readBsonType = reader.getCurrentBsonType();
+    switch (readBsonType) {
+    case DOCUMENT:
+      writeToListOrMap(reader, new MapOrListWriterImpl(writer.rootAsMap()), false, null);
+      break;
+    default:
+      throw new DrillRuntimeException("Root object must be DOCUMENT type. Found: " + readBsonType);
+    }
+  }
+
+  private void writeToListOrMap(BsonReader reader, final MapOrListWriterImpl writer, boolean isList, String fieldName) {
+    writer.start();
+    // If isList is true, then filedName can be null as it is not required while
+    // writing
+    while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
+      if (!isList) {
+        fieldName = reader.readName();
+      }
+      BsonType currentBsonType = reader.getCurrentBsonType();
+      switch (currentBsonType) {
+      case INT32:
+        int readInt32 = reader.readInt32();
+        if (readNumbersAsDouble) {
+          writeDouble(readInt32, writer, fieldName, isList);
+        } else {
+          writeInt32(readInt32, writer, fieldName, isList);
+        }
+        atLeastOneWrite = true;
+        break;
+      case INT64:
+        long readInt64 = reader.readInt64();
+        if (readNumbersAsDouble) {
+          writeDouble(readInt64, writer, fieldName, isList);
+        } else {
+          writeInt64(readInt64, writer, fieldName, isList);
+        }
+        atLeastOneWrite = true;
+        break;
+      case ARRAY:
+        reader.readStartArray();
+        writeToListOrMap(reader, (MapOrListWriterImpl) writer.list(fieldName), true, fieldName);
+        atLeastOneWrite = true;
+        break;
+      case BINARY:
+        // handle types
+        writeBinary(reader, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      case BOOLEAN:
+        boolean readBoolean = reader.readBoolean();
+        writeBoolean(readBoolean, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      case DATE_TIME:
+        long readDateTime = reader.readDateTime();
+        writeDateTime(readDateTime, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      case DOCUMENT:
+        reader.readStartDocument();
+        // To handle nested Documents.
+        MapOrListWriterImpl _writer = writer;
+        if (!isList) {
+          _writer = (MapOrListWriterImpl) writer.map(fieldName);
+        } else {
+          _writer = (MapOrListWriterImpl) writer.listoftmap(fieldName);
+        }
+        writeToListOrMap(reader, _writer, false, fieldName);
+        atLeastOneWrite = true;
+        break;
+      case DOUBLE:
+        double readDouble = reader.readDouble();
+        writeDouble(readDouble, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      case JAVASCRIPT:
+        final String readJavaScript = reader.readJavaScript();
+        writeString(readJavaScript, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      case JAVASCRIPT_WITH_SCOPE:
+        final String readJavaScriptWithScopeString = reader.readJavaScriptWithScope();
+        writeString(readJavaScriptWithScopeString, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      case NULL:
+        // just read and ignore.
+        reader.readNull();
+        break;
+      case OBJECT_ID:
+        writeObjectId(reader, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      case STRING:
+        final String readString = reader.readString();
+        writeString(readString, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      case SYMBOL:
+        final String readSymbol = reader.readSymbol();
+        writeString(readSymbol, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      case TIMESTAMP:
+        int time = reader.readTimestamp().getTime();
+        writeTimeStamp(time, writer, fieldName, isList);
+        atLeastOneWrite = true;
+        break;
+      default:
+        // Didn't handled REGULAR_EXPRESSION and DB_POINTER types
+        throw new DrillRuntimeException("UnSupported Bson type: " + currentBsonType);
+      }
+    }
+    if (!isList) {
+      reader.readEndDocument();
+    } else {
+      reader.readEndArray();
+    }
+  }
+
+  private void writeBinary(BsonReader reader, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
+    final VarBinaryHolder vb = new VarBinaryHolder();
+    BsonBinary readBinaryData = reader.readBinaryData();
+    byte[] data = readBinaryData.getData();
+    Byte type = (Byte) readBinaryData.getType();
+    // Based on specified binary type, cast it accordingly
+    switch (type.intValue()) {
+    case 1:
+      // Double 1
+      writeDouble(ByteBuffer.wrap(data).getDouble(), writer, fieldName, isList);
+      break;
+    case 2:
+      // String 2
+      writeString(new String(data), writer, fieldName, isList);
+      break;
+    case 8:
+      // Boolean 8
+      boolean boolValue = (data == null || data.length == 0) ? false : data[0] != 0x00;
+      writeBoolean(boolValue, writer, fieldName, isList);
+      break;
+    case 9:
+      // Date 9
+      writeDateTime(ByteBuffer.wrap(data).getLong(), writer, fieldName, isList);
+      break;
+    case 13:
+      // JavaScript 13
+      writeString(new String(data), writer, fieldName, isList);
+      break;
+    case 14:
+      // Symbol 14
+      writeString(new String(data), writer, fieldName, isList);
+      break;
+    case 15:
+      // JavaScript (with scope) 15
+      writeString(new String(data), writer, fieldName, isList);
+      break;
+    case 16:
+      // 32-bit integer 16
+      writeInt32(ByteBuffer.wrap(data).getInt(), writer, fieldName, isList);
+      break;
+    case 17:
+      // Timestamp 17
+      writeTimeStamp(ByteBuffer.wrap(data).getInt(), writer, fieldName, isList);
+      break;
+    case 18:
+      // 64-bit integer 18
+      writeInt64(ByteBuffer.wrap(data).getInt(), writer, fieldName, isList);
+      break;
+    default:
+      // In case of Object(3)/Binary data (5)/Object id (7) or in other case
+      // considering as VarBinary
+      final byte[] bytes = readBinaryData.getData();
+      writeBinary(writer, fieldName, isList, vb, bytes);
+      break;
+    }
+  }
+
+  private void writeTimeStamp(int timestamp, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
+    DateTime dateTime = new DateTime(timestamp);
+    TimeWriter t;
+    if (isList == false) {
+      t = writer.map.time(fieldName);
+    } else {
+      t = writer.list.time();
+    }
+    t.writeTime((int) (dateTime.withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis()));
+  }
+
+  private void writeString(String readString, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
+    final int length = readString.length();
+    final VarCharHolder vh = new VarCharHolder();
+    ensure(length);
+    try {
+      workBuf.setBytes(0, readString.getBytes("UTF-8"));
+    } catch (UnsupportedEncodingException e) {
+      throw new DrillRuntimeException("Unable to read string value for field: " + fieldName, e);
+    }
+    vh.buffer = workBuf;
+    vh.start = 0;
+    vh.end = length;
+    if (isList == false) {
+      writer.varChar(fieldName).write(vh);
+    } else {
+      writer.list.varChar().write(vh);
+    }
+  }
+
+  private void writeObjectId(BsonReader reader, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
+    final VarBinaryHolder vObj = new VarBinaryHolder();
+    final byte[] objBytes = reader.readObjectId().toByteArray();
+    writeBinary(writer, fieldName, isList, vObj, objBytes);
+  }
+
+  private void writeDouble(double readDouble, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
+    final Float8Holder f8h = new Float8Holder();
+    f8h.value = readDouble;
+    if (isList == false) {
+      writer.float8(fieldName).write(f8h);
+    } else {
+      writer.list.float8().write(f8h);
+    }
+  }
+
+  private void writeDateTime(long readDateTime, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
+    DateTime date = new DateTime(readDateTime);
+    DateWriter dt;
+    if (isList == false) {
+      dt = writer.map.date(fieldName);
+    } else {
+      dt = writer.list.date();
+    }
+    dt.writeDate(date.withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis());
+  }
+
+  private void writeBoolean(boolean readBoolean, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
+    final BitHolder bit = new BitHolder();
+    bit.value = readBoolean ? 1 : 0;
+    if (isList == false) {
+      writer.bit(fieldName).write(bit);
+    } else {
+      writer.list.bit().write(bit);
+    }
+  }
+
+  private void writeBinary(final MapOrListWriterImpl writer, String fieldName, boolean isList,
+      final VarBinaryHolder vb, final byte[] bytes) {
+    ensure(bytes.length);
+    workBuf.setBytes(0, bytes);
+    vb.buffer = workBuf;
+    vb.start = 0;
+    vb.end = bytes.length;
+    if (isList == false) {
+      writer.binary(fieldName).write(vb);
+    } else {
+      writer.list.varBinary().write(vb);
+    }
+  }
+
+  private void writeInt64(long readInt64, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
+    final BigIntHolder bh = new BigIntHolder();
+    bh.value = readInt64;
+    if (isList == false) {
+      writer.bigInt(fieldName).write(bh);
+    } else {
+      writer.list.bigInt().write(bh);
+    }
+  }
+
+  private void writeInt32(int readInt32, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
+    final IntHolder ih = new IntHolder();
+    ih.value = readInt32;
+    if (isList == false) {
+      writer.integer(fieldName).write(ih);
+    } else {
+      writer.list.integer().write(ih);
+    }
+  }
+
+  public void ensureAtLeastOneField(ComplexWriter writer) {
+    if (!atLeastOneWrite) {
+      // if we had no columns, create one empty one so we can return some data
+      // for count purposes.
+      SchemaPath sp = columns.get(0);
+      PathSegment root = sp.getRootSegment();
+      BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
+      while (root.getChild() != null && !root.getChild().isArray()) {
+        fieldWriter = fieldWriter.map(root.getNameSegment().getPath());
+        root = root.getChild();
+      }
+      fieldWriter.integer(root.getNameSegment().getPath());
+    }
+  }
+
+  public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String field,
+      String msg, Object... args) {
+    return null;
+  }
+
+  public UserException.Builder getExceptionWithContext(Throwable exception, String field, String msg, Object... args) {
+    return null;
+  }
+
+  private void ensure(final int length) {
+    workBuf = workBuf.reallocIfNeeded(length);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
new file mode 100644
index 0000000..4255924
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/bson/TestBsonRecordReader.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.bson;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.store.TestOutputMutator;
+import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.bson.BsonBinary;
+import org.bson.BsonBinarySubType;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDocument;
+import org.bson.BsonDocumentReader;
+import org.bson.BsonDocumentWriter;
+import org.bson.BsonDouble;
+import org.bson.BsonInt64;
+import org.bson.BsonNull;
+import org.bson.BsonObjectId;
+import org.bson.BsonString;
+import org.bson.BsonSymbol;
+import org.bson.BsonTimestamp;
+import org.bson.BsonWriter;
+import org.bson.types.ObjectId;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBsonRecordReader extends BaseTestQuery {
+  private static VectorContainerWriter writer;
+  private static TestOutputMutator mutator;
+  private static BsonRecordReader bsonReader;
+
+  @BeforeClass
+  public static void setUp() {
+    BufferAllocator bufferAllocator = getDrillbitContext().getAllocator();
+    mutator = new TestOutputMutator(bufferAllocator);
+    writer = new VectorContainerWriter(mutator);
+    bsonReader = new BsonRecordReader(bufferAllocator.buffer(1024), false, false);
+  }
+
+  @Test
+  public void testIntType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    bsonDoc.append("seqNo", new BsonInt64(10));
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    assertEquals(10l, mapReader.reader("seqNo").readLong().longValue());
+  }
+
+  @Test
+  public void testTimeStampType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    bsonDoc.append("ts", new BsonTimestamp(1000, 10));
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    assertEquals(1000l, mapReader.reader("ts").readDateTime().getMillis());
+  }
+
+  @Test
+  public void testSymbolType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    bsonDoc.append("symbolKey", new BsonSymbol("test_symbol"));
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    assertEquals("test_symbol", mapReader.reader("symbolKey").readText().toString());
+  }
+
+  @Test
+  public void testStringType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    bsonDoc.append("stringKey", new BsonString("test_string"));
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    assertEquals("test_string", mapReader.reader("stringKey").readText().toString());
+  }
+
+  @Test
+  public void testObjectIdType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    BsonObjectId value = new BsonObjectId(new ObjectId());
+    bsonDoc.append("_idKey", value);
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    byte[] readByteArray = mapReader.reader("_idKey").readByteArray();
+    assertTrue(Arrays.equals(value.getValue().toByteArray(), readByteArray));
+  }
+
+  @Test
+  public void testNullType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    bsonDoc.append("nullKey", new BsonNull());
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    assertEquals(null, mapReader.reader("nullKey").readObject());
+  }
+
+  @Test
+  public void testDoubleType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    bsonDoc.append("doubleKey", new BsonDouble(12.35));
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    assertEquals(12.35d, mapReader.reader("doubleKey").readDouble().doubleValue(), 0.00001);
+  }
+
+  @Test
+  public void testArrayOfDocumentType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    BsonWriter bw = new BsonDocumentWriter(bsonDoc);
+    bw.writeStartDocument();
+    bw.writeName("a");
+    bw.writeString("MongoDB");
+    bw.writeName("b");
+    bw.writeStartArray();
+    bw.writeStartDocument();
+    bw.writeName("c");
+    bw.writeInt32(1);
+    bw.writeEndDocument();
+    bw.writeEndArray();
+    bw.writeEndDocument();
+    bw.flush();
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    FieldReader reader = writer.getMapVector().getReader();
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) reader;
+    FieldReader reader3 = mapReader.reader("b");
+    assertEquals("MongoDB", mapReader.reader("a").readText().toString());
+  }
+
+  @Test
+  public void testRecursiveDocuments() throws IOException {
+    BsonDocument topDoc = new BsonDocument();
+    final int count = 3;
+    for (int i = 0; i < count; ++i) {
+      BsonDocument bsonDoc = new BsonDocument();
+      BsonWriter bw = new BsonDocumentWriter(bsonDoc);
+      bw.writeStartDocument();
+      bw.writeName("k1" + i);
+      bw.writeString("drillMongo1" + i);
+      bw.writeName("k2" + i);
+      bw.writeString("drillMongo2" + i);
+      bw.writeEndDocument();
+      bw.flush();
+      topDoc.append("doc" + i, bsonDoc);
+    }
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(topDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    for (int i = 0; i < count; ++i) {
+      SingleMapReaderImpl reader = (SingleMapReaderImpl) mapReader.reader("doc" + i);
+      assertEquals("drillMongo1" + i, reader.reader("k1" + i).readText().toString());
+      assertEquals("drillMongo2" + i, reader.reader("k2" + i).readText().toString());
+    }
+  }
+
+  @Test
+  public void testDateTimeType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    bsonDoc.append("dateTimeKey", new BsonDateTime(5262729712L));
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    assertEquals(5262729712L, mapReader.reader("dateTimeKey").readDateTime().getMillis());
+  }
+
+  @Test
+  public void testBooleanType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    bsonDoc.append("booleanKey", new BsonBoolean(true));
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    assertTrue(mapReader.reader("booleanKey").readBoolean());
+  }
+
+  @Test
+  public void testBinaryTypes() throws IOException {
+    // test with different binary types
+    BsonDocument bsonDoc = new BsonDocument();
+    // Binary
+    // String
+    byte[] bytes = "binaryValue".getBytes();
+    bsonDoc.append("binaryKey", new BsonBinary(BsonBinarySubType.BINARY, bytes));
+    // String
+    byte[] bytesString = "binaryStringValue".getBytes();
+    bsonDoc.append("binaryStringKey", new BsonBinary((byte) 2, bytesString));
+    // Double
+    byte[] bytesDouble = new byte[8];
+    java.nio.ByteBuffer.wrap(bytesDouble).putDouble(23.0123);
+    BsonBinary bsonDouble = new BsonBinary((byte) 1, bytesDouble);
+    bsonDoc.append("binaryDouble", bsonDouble);
+    // Boolean
+    byte[] booleanBytes = new byte[8];
+    java.nio.ByteBuffer.wrap(booleanBytes).put((byte) 1);
+    BsonBinary bsonBoolean = new BsonBinary((byte) 8, booleanBytes);
+    bsonDoc.append("bsonBoolean", bsonBoolean);
+    writer.reset();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    assertTrue(Arrays.equals(bytes, mapReader.reader("binaryKey").readByteArray()));
+    assertEquals("binaryStringValue", mapReader.reader("binaryStringKey").readText().toString());
+    assertEquals(23.0123, mapReader.reader("binaryDouble").readDouble().doubleValue(), 0);
+    FieldReader reader = mapReader.reader("bsonBoolean");
+    assertEquals(true, reader.readBoolean().booleanValue());
+  }
+
+  @Test
+  public void testArrayType() throws IOException {
+    BsonDocument bsonDoc = new BsonDocument();
+    BsonWriter bw = new BsonDocumentWriter(bsonDoc);
+    bw.writeStartDocument();
+    bw.writeName("arrayKey");
+    bw.writeStartArray();
+    bw.writeInt32(1);
+    bw.writeInt32(2);
+    bw.writeInt32(3);
+    bw.writeEndArray();
+    bw.writeEndDocument();
+    bw.flush();
+    bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
+    SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
+    FieldReader reader = mapReader.reader("arrayKey");
+    assertEquals(3, reader.size());
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    try {
+      writer.close();
+    } catch (Exception e) {
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/exec/vector/src/main/codegen/templates/BaseWriter.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/BaseWriter.java b/exec/vector/src/main/codegen/templates/BaseWriter.java
index 8a9ea56..9ca72c2 100644
--- a/exec/vector/src/main/codegen/templates/BaseWriter.java
+++ b/exec/vector/src/main/codegen/templates/BaseWriter.java
@@ -97,4 +97,21 @@ package org.apache.drill.exec.vector.complex.writer;
     void setValueCount(int count);
     void reset();
   }
+
+  public interface MapOrListWriter {
+    void start();
+    void end();
+    MapOrListWriter map(String name);
+    MapOrListWriter listoftmap(String name);
+    MapOrListWriter list(String name);
+    boolean isMapWriter();
+    boolean isListWriter();
+    VarCharWriter varChar(String name);
+    IntWriter integer(String name);
+    BigIntWriter bigInt(String name);
+    Float4Writer float4(String name);
+    Float8Writer float8(String name);
+    BitWriter bit(String name);
+    VarBinaryWriter binary(String name);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5c7a992d/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/MapOrListWriterImpl.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/MapOrListWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/MapOrListWriterImpl.java
new file mode 100644
index 0000000..1cfcdab
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/MapOrListWriterImpl.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.impl;
+
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapOrListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
+import org.apache.drill.exec.vector.complex.writer.BitWriter;
+import org.apache.drill.exec.vector.complex.writer.Float4Writer;
+import org.apache.drill.exec.vector.complex.writer.Float8Writer;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+
+public class MapOrListWriterImpl implements MapOrListWriter {
+
+  public final BaseWriter.MapWriter map;
+  public final BaseWriter.ListWriter list;
+
+  public MapOrListWriterImpl(final BaseWriter.MapWriter writer) {
+    this.map = writer;
+    this.list = null;
+  }
+
+  public MapOrListWriterImpl(final BaseWriter.ListWriter writer) {
+    this.map = null;
+    this.list = writer;
+  }
+
+  public void start() {
+    if (map != null) {
+      map.start();
+    } else {
+      list.startList();
+    }
+  }
+
+  public void end() {
+    if (map != null) {
+      map.end();
+    } else {
+      list.endList();
+    }
+  }
+
+  public MapOrListWriter map(final String name) {
+    assert map != null;
+    return new MapOrListWriterImpl(map.map(name));
+  }
+
+  public MapOrListWriter listoftmap(final String name) {
+    assert list != null;
+    return new MapOrListWriterImpl(list.map());
+  }
+
+  public MapOrListWriter list(final String name) {
+    assert map != null;
+    return new MapOrListWriterImpl(map.list(name));
+  }
+
+  public boolean isMapWriter() {
+    return map != null;
+  }
+
+  public boolean isListWriter() {
+    return list != null;
+  }
+
+  public VarCharWriter varChar(final String name) {
+    return (map != null) ? map.varChar(name) : list.varChar();
+  }
+
+  public IntWriter integer(final String name) {
+    return (map != null) ? map.integer(name) : list.integer();
+  }
+
+  public BigIntWriter bigInt(final String name) {
+    return (map != null) ? map.bigInt(name) : list.bigInt();
+  }
+
+  public Float4Writer float4(final String name) {
+    return (map != null) ? map.float4(name) : list.float4();
+  }
+
+  public Float8Writer float8(final String name) {
+    return (map != null) ? map.float8(name) : list.float8();
+  }
+
+  public BitWriter bit(final String name) {
+    return (map != null) ? map.bit(name) : list.bit();
+  }
+
+  public VarBinaryWriter binary(final String name) {
+    return (map != null) ? map.varBinary(name) : list.varBinary();
+  }
+
+}


Mime
View raw message