drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject drill git commit: DRILL-1692: Fixing Mongo join issue when * is selected
Date Wed, 04 Mar 2015 05:59:18 GMT
Repository: drill
Updated Branches:
  refs/heads/master a66b6cc0d -> 71b6bfe86


DRILL-1692: Fixing Mongo join issue when * is selected


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/71b6bfe8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/71b6bfe8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/71b6bfe8

Branch: refs/heads/master
Commit: 71b6bfe86e5d143acac8ff9affb1b97a5d39d839
Parents: a66b6cc
Author: Kamesh <kam.iitkgp@gmail.com>
Authored: Tue Mar 3 21:22:25 2015 +0530
Committer: Parth Chandra <pchandra@maprtech.com>
Committed: Tue Mar 3 21:58:42 2015 -0800

----------------------------------------------------------------------
 .../exec/store/mongo/MongoRecordReader.java     | 102 ++++---------------
 1 file changed, 17 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/71b6bfe8/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index da96701..15ef197 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.store.mongo;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -29,22 +28,15 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,16 +57,11 @@ import com.mongodb.ServerAddress;
 public class MongoRecordReader extends AbstractRecordReader {
   static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
 
-  private static final int TARGET_RECORD_COUNT = 3000;
-
   private DBCollection collection;
   private DBCursor cursor;
 
-  private NullableVarCharVector valueVector;
-
   private JsonReader jsonReader;
   private VectorContainerWriter writer;
-  private List<SchemaPath> columns;
 
   private BasicDBObject filters;
   private DBObject fields;
@@ -92,9 +79,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     this.fields = new BasicDBObject();
     // exclude _id field, if not mentioned by user.
     this.fields.put(DrillMongoConstants.ID, Integer.valueOf(0));
-    this.columns = projectedColumns;
     setColumns(projectedColumns);
-    transformColumns(projectedColumns);
     this.fragmentContext = context;
     this.filters = new BasicDBObject();
     Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters(
@@ -109,14 +94,13 @@ public class MongoRecordReader extends AbstractRecordReader {
       Collection<SchemaPath> projectedColumns) {
     Set<SchemaPath> transformed = Sets.newLinkedHashSet();
     if (!isStarQuery()) {
-      Iterator<SchemaPath> columnIterator = projectedColumns.iterator();
-      while (columnIterator.hasNext()) {
-        SchemaPath column = columnIterator.next();
-        NameSegment root = column.getRootSegment();
-        String fieldName = root.getPath();
+      for (SchemaPath column : projectedColumns ) {
+        String fieldName = column.getRootSegment().getPath();
         transformed.add(SchemaPath.getSimplePath(fieldName));
         this.fields.put(fieldName, Integer.valueOf(1));
       }
+    } else {
+      transformed.add(AbstractRecordReader.STAR_COLUMN);
     }
     return transformed;
   }
@@ -161,96 +145,44 @@ public class MongoRecordReader extends AbstractRecordReader {
 
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
-    if (isStarQuery()) {
-      try {
-        SchemaPath startColumn = SchemaPath.getSimplePath("*");
-        MaterializedField field = MaterializedField.create(startColumn,
-            Types.optional(MinorType.VARCHAR));
-        valueVector = output.addField(field, NullableVarCharVector.class);
-      } catch (SchemaChangeException e) {
-        throw new ExecutionSetupException(e);
-      }
-    } else {
-      this.writer = new VectorContainerWriter(output);
-      this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), columns, enableAllTextMode);
-    }
+    this.writer = new VectorContainerWriter(output);
+    this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()),
enableAllTextMode);
     logger.info("Filters Applied : " + filters);
     logger.info("Fields Selected :" + fields);
     cursor = collection.find(filters, fields);
   }
 
-  private int handleNonStarQuery() {
+  @Override
+  public int next() {
     writer.allocate();
     writer.reset();
 
     int docCount = 0;
     Stopwatch watch = new Stopwatch();
     watch.start();
-    int rowCount = 0;
 
     try {
-      String errMsg = "Document {} is too big to fit into allocated ValueVector";
-      for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) {
+      while (docCount < BaseValueVector.INITIAL_VALUE_ALLOCATION && cursor.hasNext())
{
         writer.setPosition(docCount);
         String doc = cursor.next().toString();
         jsonReader.setSource(doc.getBytes(Charsets.UTF_8));
-        if (jsonReader.write(writer) == JsonReader.ReadState.WRITE_SUCCEED) {
-          docCount++;
-        } else {
-          if (docCount == 0) {
-            throw new DrillRuntimeException(errMsg);
-          }
-        }
+        jsonReader.write(writer);
+        docCount++;
       }
 
       jsonReader.ensureAtLeastOneField(writer);
 
       writer.setValueCount(docCount);
       logger.debug("Took {} ms to get {} records",
-          watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
+          watch.elapsed(TimeUnit.MILLISECONDS), docCount);
       return docCount;
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      throw new DrillRuntimeException("Failure while reading Mongo Record.", e);
+    } catch (IOException e) {
+      String msg = "Failure while reading document. - Parser was at record: " + (docCount
+ 1);
+      logger.error(msg, e);
+      throw new DrillRuntimeException(msg, e);
     }
   }
 
-  private int handleStarQuery() {
-    Stopwatch watch = new Stopwatch();
-    watch.start();
-    int rowCount = 0;
-
-    if (valueVector == null) {
-      throw new DrillRuntimeException("Value vector is not initialized!!!");
-    }
-    valueVector.clear();
-    valueVector
-        .allocateNew(4 * 1024 * TARGET_RECORD_COUNT, TARGET_RECORD_COUNT);
-
-    String errMsg = "Document {} is too big to fit into allocated ValueVector";
-
-    try {
-      for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) {
-        String doc = cursor.next().toString();
-        byte[] record = doc.getBytes(Charsets.UTF_8);
-        valueVector.getMutator().setSafe(rowCount, record, 0,
-            record.length);
-      }
-      valueVector.getMutator().setValueCount(rowCount);
-      logger.debug("Took {} ms to get {} records",
-          watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
-      return rowCount;
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      throw new DrillRuntimeException("Failure while reading Mongo Record.", e);
-    }
-  }
-
-  @Override
-  public int next() {
-    return isStarQuery() ? handleStarQuery() : handleNonStarQuery();
-  }
-
   @Override
   public void cleanup() {
   }


Mime
View raw message