drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [3/7] drill git commit: DRILL-2006: Updated Text reader. Increases variations of text files Drill can work with.
Date Thu, 07 May 2015 08:56:39 GMT
DRILL-2006: Updated Text reader.  Increases variations of text files Drill can work with.

Text reader is heavily inspired by uniVocity parser although it is now byte based and customized for Drill's memory representations.

Also updated the RecordReader interface so that OperatorContext is presented at setup time rather than being a separate call.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7ec99871
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7ec99871
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7ec99871

Branch: refs/heads/master
Commit: 7ec99871b97c70793e2e5eb2e795040c5b6ade66
Parents: 151a7f4
Author: Jacques Nadeau <jacques@apache.org>
Authored: Wed May 6 02:56:45 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu May 7 00:06:33 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/common/config/DrillConfig.java |   3 +
 .../exec/store/hbase/HBaseRecordReader.java     |  11 +-
 .../drill/exec/store/hive/HiveRecordReader.java |  11 +-
 .../exec/store/mongo/MongoRecordReader.java     |  11 +-
 exec/java-exec/pom.xml                          |   7 +
 .../codegen/templates/FixedValueVectors.java    |   2 +-
 .../codegen/templates/RepeatedValueVectors.java |  10 +-
 .../templates/VariableLengthVectors.java        |  24 +-
 .../src/main/java/io/netty/buffer/DrillBuf.java |  16 +-
 .../org/apache/drill/exec/ExecConstants.java    |   3 +
 .../exec/client/PrintingResultsListener.java    |  16 +-
 .../drill/exec/physical/impl/ScanBatch.java     |   6 +-
 .../UnorderedReceiverBatch.java                 |   2 +-
 .../server/options/SystemOptionManager.java     |   1 +
 .../exec/store/LocalSyncableFileSystem.java     |  34 +-
 .../apache/drill/exec/store/RecordReader.java   |  10 +-
 .../drill/exec/store/avro/AvroRecordReader.java |  32 +-
 .../drill/exec/store/dfs/DrillFileSystem.java   |  20 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   9 +-
 .../exec/store/dfs/easy/EasyGroupScan.java      |   6 +-
 .../exec/store/easy/json/JSONRecordReader.java  |  36 +-
 .../exec/store/easy/text/TextFormatPlugin.java  | 123 ++++-
 .../compliant/CompliantTextRecordReader.java    | 152 ++++++
 .../text/compliant/RepeatedVarCharOutput.java   | 324 ++++++++++++
 .../StreamFinishedPseudoException.java          |  29 ++
 .../store/easy/text/compliant/TextInput.java    | 392 +++++++++++++++
 .../store/easy/text/compliant/TextOutput.java   |  87 ++++
 .../easy/text/compliant/TextParsingContext.java | 124 +++++
 .../text/compliant/TextParsingSettings.java     | 291 +++++++++++
 .../store/easy/text/compliant/TextReader.java   | 498 +++++++++++++++++++
 .../drill/exec/store/mock/MockRecordReader.java |   2 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   3 -
 .../columnreaders/ParquetRecordReader.java      |   5 +-
 .../exec/store/parquet2/DrillParquetReader.java |   3 +-
 .../drill/exec/store/pojo/PojoRecordReader.java |   2 +-
 .../exec/store/schedule/CompleteFileWork.java   |  17 +-
 .../exec/store/text/DrillTextRecordReader.java  |   4 +-
 .../store/parquet/ParquetRecordReaderTest.java  |   2 +-
 .../exec/store/text/TestNewTextReader.java      |  50 ++
 .../drill/exec/store/text/TestTextColumn.java   |  16 +-
 .../resources/bootstrap-storage-plugins.json    |   3 +-
 .../test/resources/store/text/data/letters.txt  |   6 +-
 .../src/test/resources/textinput/input1.csv     |   1 +
 .../src/test/resources/textinput/input2.csv     |   2 +
 44 files changed, 2248 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 2b9b740..522303f 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -49,6 +49,9 @@ public final class DrillConfig extends NestedConfig{
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
   private final ObjectMapper mapper;
   private final ImmutableList<String> startupArguments;
+
+  public static final boolean ON_OSX = System.getProperty("os.name").contains("OS X");
+
   @SuppressWarnings("restriction")  private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 42038e8..9458db2 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -126,16 +126,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
     return transformed;
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     this.outputMutator = output;
     familyVectorMap = new HashMap<String, MapVector>();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 8c400ea..a4ad0c4 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -249,16 +249,9 @@ public class HiveRecordReader extends AbstractRecordReader {
     }
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     try {
       for (int i = 0; i < selectedColumnNames.size(); i++) {
         MajorType type = getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), true);

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 3c4472c..53c576e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -147,7 +147,8 @@ public class MongoRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     this.writer = new VectorContainerWriter(output);
     this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false);
     logger.info("Filters Applied : " + filters);
@@ -190,13 +191,5 @@ public class MongoRecordReader extends AbstractRecordReader {
   public void cleanup() {
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 8839c54..57cd572 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -54,6 +54,13 @@
       <artifactId>commons-pool2</artifactId>
       <version>2.1</version>
     </dependency>
+    
+    <dependency>
+      <groupId>com.univocity</groupId>
+      <artifactId>univocity-parsers</artifactId>
+      <version>1.3.0</version>
+    </dependency>
+    
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math</artifactId>

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 6a924b7..1059bfb 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -218,7 +218,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     copyFrom(fromIndex, thisIndex, from);
   }
 
-  private void decrementAllocationMonitor() {
+  public void decrementAllocationMonitor() {
     if (allocationMonitor > 0) {
       allocationMonitor = 0;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index c06e29c..c0fba66 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -84,8 +84,16 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
     return offsets.getBufferSize() + values.getBufferSize();
   }
 
+  public UInt4Vector getOffsetVector(){
+    return offsets;
+  }
+  
+  public ${minor.class}Vector getValuesVector(){
+    return values;
+  }
+  
   public DrillBuf getBuffer(){
-      return values.getBuffer();
+    return values.getBuffer();
   }
   
   public TransferPair getTransferPair(){

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 8a4b663..7aa7415 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -159,6 +159,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     return buffers;
   }
   
+  public long getOffsetAddr(){
+    return offsetVector.getBuffer().memoryAddress();
+  }
+  
+  public UInt${type.width}Vector getOffsetVector(){
+    return offsetVector;
+  }
+  
   public TransferPair getTransferPair(){
     return new TransferImpl(getField());
   }
@@ -304,16 +312,16 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       data = newBuf;
     }
 
-    private void decrementAllocationMonitor() {
-      if (allocationMonitor > 0) {
-        allocationMonitor = 0;
-      }
-      --allocationMonitor;
+  public void decrementAllocationMonitor() {
+    if (allocationMonitor > 0) {
+      allocationMonitor = 0;
     }
+    --allocationMonitor;
+  }
 
-    private void incrementAllocationMonitor() {
-      ++allocationMonitor;
-    }
+  private void incrementAllocationMonitor() {
+    ++allocationMonitor;
+  }
 
   public Accessor getAccessor(){
     return accessor;

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 2016e1e..7f80f7a 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -38,7 +38,7 @@ import org.apache.drill.exec.util.AssertionUtil;
 
 import com.google.common.base.Preconditions;
 
-public final class DrillBuf extends AbstractByteBuf {
+public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
 
   private static final boolean BOUNDS_CHECKING_ENABLED = AssertionUtil.BOUNDS_CHECKING_ENABLED;
@@ -614,6 +614,15 @@ public final class DrillBuf extends AbstractByteBuf {
     return this;
   }
 
+  public void setByte(int index, byte b){
+    PlatformDependent.putByte(addr(index), b);
+  }
+
+  public void writeByteUnsafe(byte b){
+    PlatformDependent.putByte(addr(readerIndex), b);
+    readerIndex++;
+  }
+
   @Override
   protected byte _getByte(int index) {
     return getByte(index);
@@ -745,4 +754,9 @@ public final class DrillBuf extends AbstractByteBuf {
     return rootBuffer;
   }
 
+  @Override
+  public void close() throws Exception {
+    release();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 9f87b0b..a577815 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -212,6 +212,9 @@ public interface ExecConstants {
   public static final String ENABLE_VERBOSE_ERRORS_KEY = "exec.errors.verbose";
   public static final OptionValidator ENABLE_VERBOSE_ERRORS = new BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY, false);
 
+  public static final String ENABLE_NEW_TEXT_READER_KEY = "exec.storage.enable_new_text_reader";
+  public static final OptionValidator ENABLE_NEW_TEXT_READER = new BooleanValidator(ENABLE_NEW_TEXT_READER_KEY, true);
+
   public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";
   public static final String MAX_LOADING_CACHE_SIZE_CONFIG = "drill.exec.compile.cache_max_size";
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index 875160b..f5a119d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -17,18 +17,20 @@
  */
 package org.apache.drill.exec.client;
 
+import io.netty.buffer.DrillBuf;
+
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
@@ -36,6 +38,8 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.util.VectorUtil;
 
+import com.google.common.base.Stopwatch;
+
 public class PrintingResultsListener implements UserResultsListener {
   AtomicInteger count = new AtomicInteger();
   private CountDownLatch latch = new CountDownLatch(1);
@@ -45,6 +49,7 @@ public class PrintingResultsListener implements UserResultsListener {
   BufferAllocator allocator;
   volatile UserException exception;
   QueryId queryId;
+  Stopwatch w = new Stopwatch();
 
   public PrintingResultsListener(DrillConfig config, Format format, int columnWidth) {
     this.allocator = new TopLevelAllocator(config);
@@ -56,7 +61,8 @@ public class PrintingResultsListener implements UserResultsListener {
   @Override
   public void submissionFailed(UserException ex) {
     exception = ex;
-    System.out.println("Exception (no rows returned): " + ex );
+    System.out.println("Exception (no rows returned): " + ex + ".  Returned in " + w.elapsed(TimeUnit.MILLISECONDS)
+        + "ms.");
     latch.countDown();
   }
 
@@ -64,7 +70,8 @@ public class PrintingResultsListener implements UserResultsListener {
   public void queryCompleted(QueryState state) {
     allocator.close();
     latch.countDown();
-    System.out.println("Total rows returned: " + count.get());
+    System.out.println("Total rows returned : " + count.get() + ".  Returned in " + w.elapsed(TimeUnit.MILLISECONDS)
+        + "ms.");
   }
 
   @Override
@@ -113,6 +120,7 @@ public class PrintingResultsListener implements UserResultsListener {
 
   @Override
   public void queryIdArrived(QueryId queryId) {
+    w.start();
     this.queryId = queryId;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4700dbd..97e8d28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -92,12 +92,11 @@ public class ScanBatch implements CloseableRecordBatch {
     }
     this.currentReader = readers.next();
     this.oContext = oContext;
-    this.currentReader.setOperatorContext(this.oContext);
 
     boolean setup = false;
     try {
       oContext.getStats().startProcessing();
-      this.currentReader.setup(mutator);
+      this.currentReader.setup(oContext, mutator);
       setup = true;
     } finally {
       // if we had an exception during setup, make sure to release existing data.
@@ -188,8 +187,7 @@ public class ScanBatch implements CloseableRecordBatch {
           currentReader.cleanup();
           currentReader = readers.next();
           partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
-          currentReader.setup(mutator);
-          currentReader.setOperatorContext(oContext);
+          currentReader.setup(oContext, mutator);
           try {
             currentReader.allocate(fieldVectorMap);
           } catch (OutOfMemoryException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index c9d9c11..66a2092 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -80,7 +80,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     oContext = context.newOperatorContext(config, false);
     this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
 
-    this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null);
+    this.stats = oContext.getStats();
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
     this.config = config;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 33b2a4c..5dff828 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -108,6 +108,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR,
       ExecConstants.DRILLBIT_CONTROLS_VALIDATOR,
       ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR,
+      ExecConstants.ENABLE_NEW_TEXT_READER
   };
 
   private final PStoreConfig<OptionValue> config;

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
index 37b9c9d..b88cc28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
@@ -28,8 +28,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -161,7 +163,7 @@ public class LocalSyncableFileSystem extends FileSystem {
     }
   }
 
-  public class LocalInputStream extends InputStream implements Seekable, PositionedReadable {
+  public class LocalInputStream extends InputStream implements Seekable, PositionedReadable, ByteBufferReadable {
 
     private BufferedInputStream input;
 
@@ -200,6 +202,36 @@ public class LocalSyncableFileSystem extends FileSystem {
       throw new IOException("seekToNewSource not supported");
     }
 
+
+
+    @Override
+    public int read(ByteBuffer buf) throws IOException {
+      buf.reset();
+
+      if(buf.hasArray()){
+        int read = read(buf.array(), buf.arrayOffset(), buf.capacity());
+        buf.limit(read);
+        return read;
+      }else{
+        byte[] b = new byte[buf.capacity()];
+        int read = read(b);
+        buf.put(b);
+        return read;
+      }
+
+    }
+
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      return input.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      return input.read(b, off, len);
+    }
+
     @Override
     public int read() throws IOException {
       byte[] b = new byte[1];

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 09495f5..61ccac5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -39,19 +39,11 @@ public interface RecordReader {
    *          mutating the set of schema values for that particular record.
    * @throws ExecutionSetupException
    */
-  public abstract void setup(OutputMutator output) throws ExecutionSetupException;
+  public abstract void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException;
 
   public abstract void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException;
 
   /**
-   * Set the operator context. The Reader can use this to access the operator context and allocate direct memory
-   * if needed
-   * @param operatorContext
-   */
-  public abstract void setOperatorContext(OperatorContext operatorContext);
-
-
-  /**
    * Increment record reader forward, writing into the provided output batch.
    *
    * @return The number of additional records added to the output.

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 59999ba..a52fd22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -17,16 +17,20 @@
  */
 package org.apache.drill.exec.store.avro;
 
-import com.google.common.base.Stopwatch;
-
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericContainer;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.util.Utf8;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -46,15 +50,10 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import com.google.common.base.Stopwatch;
 
 /**
  * A RecordReader implementation for Avro data files.
@@ -96,8 +95,8 @@ public class AvroRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(final OutputMutator output) throws ExecutionSetupException {
-
+  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+    operatorContext = context;
     writer = new VectorContainerWriter(output);
 
     try {
@@ -108,15 +107,6 @@ public class AvroRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
   public int next() {
     final Stopwatch watch = new Stopwatch().start();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index f8afe3f..b6a9c30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -19,14 +19,12 @@ package org.apache.drill.exec.store.dfs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.util.AssertionUtil;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -53,11 +51,17 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 /**
  * DrillFileSystem is the wrapper around the actual FileSystem implementation.
  *
@@ -94,6 +98,7 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
 
   private final FileSystem underlyingFs;
   private final OperatorStats operatorStats;
+  private final CompressionCodecFactory codecFactory;
 
   public DrillFileSystem(Configuration fsConf) throws IOException {
     this(fsConf, null);
@@ -101,6 +106,7 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
 
   public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException {
     this.underlyingFs = FileSystem.get(fsConf);
+    this.codecFactory = new CompressionCodecFactory(fsConf);
     this.operatorStats = operatorStats;
   }
 
@@ -717,6 +723,14 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
     }
   }
 
+  public InputStream openPossiblyCompressedStream(Path path) throws IOException {
+    CompressionCodec codec = codecFactory.getCodec(path); // infers from file ext.
+    if (codec != null) {
+      return codec.createInputStream(open(path));
+    } else {
+      return open(path);
+    }
+  }
   @Override
   public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream) {
     openedFiles.put(fsDataInputStream, new DebugStackTrace(path, Thread.currentThread().getStackTrace()));

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index b4efe70..762760a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -49,7 +49,6 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -66,7 +65,6 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   private final StoragePluginConfig storageConfig;
   protected final FormatPluginConfig formatConfig;
   private final String name;
-  protected final CompressionCodecFactory codecFactory;
   private final boolean compressible;
 
   protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
@@ -82,7 +80,6 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     this.storageConfig = storageConfig;
     this.formatConfig = formatConfig;
     this.name = name == null ? defaultName : name;
-    this.codecFactory = new CompressionCodecFactory(new Configuration(fsConf));
   }
 
   @Override
@@ -148,8 +145,10 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
         newColumns.add(AbstractRecordReader.STAR_COLUMN);
       }
       // Create a new sub scan object with the new set of columns;
-      scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), newColumns,
-          scan.getSelectionRoot());
+      EasySubScan newScan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
+          newColumns, scan.getSelectionRoot());
+      newScan.setOperatorId(scan.getOperatorId());
+      scan = newScan;
     }
 
     int numParts = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 39dc073..5f9e02b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.BlockMapBuilder;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
+import org.apache.drill.exec.util.ImpersonationUtil;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -49,7 +50,6 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
-import org.apache.drill.exec.util.ImpersonationUtil;
 
 @JsonTypeName("fs-scan")
 public class EasyGroupScan extends AbstractFileGroupScan{
@@ -208,7 +208,9 @@ public class EasyGroupScan extends AbstractFileGroupScan{
     Preconditions.checkArgument(!filesForMinor.isEmpty(),
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
-    return new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
+    EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
+    subScan.setOperatorId(this.getOperatorId());
+    return subScan;
   }
 
   private List<FileWorkImpl> convert(List<CompleteFileWork> list) {

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 2666b2e..554c633 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -21,10 +21,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 
-import com.google.common.collect.ImmutableList;
-
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.OutOfMemoryException;
@@ -38,10 +36,7 @@ import org.apache.drill.exec.store.easy.json.reader.CountingJsonReader;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -114,9 +109,13 @@ public class JSONRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(final OutputMutator output) throws ExecutionSetupException {
+  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     try{
-      setupData();
+      if (hadoopPath != null) {
+        this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
+      }
+
       this.writer = new VectorContainerWriter(output);
       if (isSkipQuery()) {
         this.jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer());
@@ -129,18 +128,6 @@ public class JSONRecordReader extends AbstractRecordReader {
     }
   }
 
-  private void setupData() throws IOException{
-    if(hadoopPath != null){
-      final CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
-      final CompressionCodec codec = factory.getCodec(hadoopPath); // infers from file ext.
-      if (codec != null) {
-        this.stream = codec.createInputStream(fileSystem.open(hadoopPath));
-      } else {
-        this.stream = fileSystem.open(hadoopPath);
-      }
-    }
-  }
-
   private void setupParser() throws IOException{
     if(hadoopPath != null){
       jsonReader.setSource(stream);
@@ -171,15 +158,6 @@ public class JSONRecordReader extends AbstractRecordReader {
     throw exceptionBuilder.build();
   }
 
-
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(final OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   @Override
   public int next() {
     writer.allocate();

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 722650d..5756a6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -33,13 +34,15 @@ import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader;
+import org.apache.drill.exec.store.easy.text.compliant.TextParsingSettings;
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.hadoop.conf.Configuration;
@@ -47,8 +50,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
@@ -71,9 +77,15 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
       List<SchemaPath> columns) throws ExecutionSetupException {
     Path path = dfs.makeQualified(new Path(fileWork.getPath()));
     FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
-    Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported");
-    return new DrillTextRecordReader(split, getFsConf(), context,
-        ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
+
+    if (context.getOptions().getOption(ExecConstants.ENABLE_NEW_TEXT_READER_KEY).bool_val == true) {
+      TextParsingSettings settings = new TextParsingSettings();
+      settings.set((TextFormatConfig)formatConfig);
+      return new CompliantTextRecordReader(split, dfs, context, settings, columns);
+    } else {
+      char delim = ((TextFormatConfig)formatConfig).getFieldDelimiter();
+      return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns);
+    }
   }
 
   @Override
@@ -92,7 +104,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
 
-    options.put("separator", ((TextFormatConfig)getConfig()).getDelimiter());
+    options.put("separator", ((TextFormatConfig)getConfig()).getFieldDelimiterAsString());
     options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)writer.getStorageConfig()).connection);
 
     options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0));
@@ -103,42 +115,117 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     return recordWriter;
   }
 
-  @JsonTypeName("text")
+  @JsonTypeName("text") @JsonInclude(Include.NON_DEFAULT)
   public static class TextFormatConfig implements FormatPluginConfig {
 
     public List<String> extensions;
-    public String delimiter = "\n";
+    public String lineDelimiter = "\n";
+    public char fieldDelimiter = '\n';
+    public char quote = '"';
+    public char escape = '"';
+    public char comment = '#';
+    public boolean skipFirstLine = false;
+
 
     public List<String> getExtensions() {
       return extensions;
     }
 
-    public String getDelimiter() {
-      return delimiter;
+    public char getQuote() {
+      return quote;
+    }
+
+    public char getEscape() {
+      return escape;
+    }
+
+    public char getComment() {
+      return comment;
+    }
+
+    public String getLineDelimiter() {
+      return lineDelimiter;
+    }
+
+    public char getFieldDelimiter() {
+      return fieldDelimiter;
+    }
+
+    @JsonIgnore
+    public String getFieldDelimiterAsString(){
+      return new String(new char[]{fieldDelimiter});
+    }
+
+    @Deprecated
+    @JsonProperty("delimiter")
+    public void setFieldDelimiter(char delimiter){
+      this.fieldDelimiter = delimiter;
+    }
+
+    public boolean isSkipFirstLine() {
+      return skipFirstLine;
     }
 
     @Override
     public int hashCode() {
-      return 33;
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + comment;
+      result = prime * result + escape;
+      result = prime * result + ((extensions == null) ? 0 : extensions.hashCode());
+      result = prime * result + fieldDelimiter;
+      result = prime * result + ((lineDelimiter == null) ? 0 : lineDelimiter.hashCode());
+      result = prime * result + quote;
+      result = prime * result + (skipFirstLine ? 1231 : 1237);
+      return result;
     }
 
     @Override
     public boolean equals(Object obj) {
       if (this == obj) {
         return true;
-      } else if (obj == null) {
+      }
+      if (obj == null) {
         return false;
-      } else if (!(obj instanceof TextFormatConfig)) {
+      }
+      if (getClass() != obj.getClass()) {
         return false;
       }
-
-      TextFormatConfig that = (TextFormatConfig) obj;
-      if (this.delimiter.equals(that.delimiter)) {
-        return true;
+      TextFormatConfig other = (TextFormatConfig) obj;
+      if (comment != other.comment) {
+        return false;
       }
-      return false;
+      if (escape != other.escape) {
+        return false;
+      }
+      if (extensions == null) {
+        if (other.extensions != null) {
+          return false;
+        }
+      } else if (!extensions.equals(other.extensions)) {
+        return false;
+      }
+      if (fieldDelimiter != other.fieldDelimiter) {
+        return false;
+      }
+      if (lineDelimiter == null) {
+        if (other.lineDelimiter != null) {
+          return false;
+        }
+      } else if (!lineDelimiter.equals(other.lineDelimiter)) {
+        return false;
+      }
+      if (quote != other.quote) {
+        return false;
+      }
+      if (skipFirstLine != other.skipFirstLine) {
+        return false;
+      }
+      return true;
     }
 
+
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
new file mode 100644
index 0000000..b2af32d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.mapred.FileSplit;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+// New text reader, complies with the RFC 4180 standard for text/csv files
+public class CompliantTextRecordReader extends AbstractRecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextRecordReader.class);
+
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+  static final int READ_BUFFER = 1024*1024;
+  private static final int WHITE_SPACE_BUFFER = 64*1024;
+
+  // settings to be used while parsing
+  private TextParsingSettings settings;
+  // Chunk of the file to be read by this reader
+  private FileSplit split;
+  // text reader implementation
+  private TextReader reader;
+  // input buffer
+  private DrillBuf readBuffer;
+  // working buffer to handle whitespaces
+  private DrillBuf whitespaceBuffer;
+  private DrillFileSystem dfs;
+
+  public CompliantTextRecordReader(FileSplit split, DrillFileSystem dfs, FragmentContext context, TextParsingSettings settings, List<SchemaPath> columns) {
+    this.split = split;
+    this.settings = settings;
+    this.dfs = dfs;
+    setColumns(columns);
+  }
+
+  // checks to see if we are querying all columns(star) or individual columns
+  @Override
+  public boolean isStarQuery() {
+    if(settings.isUseRepeatedVarChar()){
+      return super.isStarQuery() || Iterables.tryFind(getColumns(), new Predicate<SchemaPath>() {
+        @Override
+        public boolean apply(@Nullable SchemaPath path) {
+          return path.equals(RepeatedVarCharOutput.COLUMNS);
+        }
+      }).isPresent();
+    }else{
+      return isStarQuery();
+    }
+  }
+
+  /**
+   * Performs the initial setup required for the record reader.
+   * Initializes the input stream, handling of the output record batch
+   * and the actual reader to be used.
+   * @param context  operator context from which buffer's will be allocated and managed
+   * @param outputMutator  Used to create the schema in the output record batch
+   * @throws ExecutionSetupException
+   */
+  @Override
+  public void setup(OperatorContext context, OutputMutator outputMutator) throws ExecutionSetupException {
+
+
+    readBuffer = context.getManagedBuffer(READ_BUFFER);
+    whitespaceBuffer = context.getManagedBuffer(WHITE_SPACE_BUFFER);
+
+    try {
+      InputStream stream = dfs.openPossiblyCompressedStream(split.getPath());
+      TextInput input = new TextInput(settings,  stream, readBuffer, split.getStart(), split.getStart() + split.getLength());
+
+      TextOutput output = null;
+      if(settings.isUseRepeatedVarChar()){
+        output = new RepeatedVarCharOutput(outputMutator, getColumns(), isStarQuery());
+      }else{
+        //TODO: Add field output.
+        throw new UnsupportedOperationException();
+      }
+
+      this.reader = new TextReader(settings, input, output, whitespaceBuffer);
+      reader.start();
+    } catch (SchemaChangeException | IOException e) {
+      throw new ExecutionSetupException(String.format("Failure while setting up text reader for file %s", split.getPath()), e);
+    }
+  }
+
+
+  /**
+   * Generates the next record batch
+   * @return  number of records in the batch
+   *
+   */
+  @Override
+  public int next() {
+    reader.resetForNextBatch();
+    int cnt = 0;
+
+    try{
+      while(cnt < MAX_RECORDS_PER_BATCH && reader.parseNext()){
+        cnt++;
+      }
+      reader.finishBatch();
+      return cnt;
+    }catch(IOException e){
+      throw new DrillRuntimeException(String.format("Failure while setting up text reader for file %s.  Happened at or shortly before byte position %d.", split.getPath(), reader.getPos()), e);
+    }
+  }
+
+  /**
+   * Cleanup state once we are finished processing all the records.
+   * This would internally close the input stream we are reading from.
+   */
+  @Override
+  public void cleanup() {
+    try {
+      reader.close();
+    } catch (IOException e) {
+      logger.warn("Exception while closing stream.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
new file mode 100644
index 0000000..3ad5c2a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import io.netty.buffer.DrillBuf;
+import io.netty.util.internal.PlatformDependent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.RepeatedVarCharVector;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Class is responsible for generating record batches for text file inputs. We generate
+ * a record batch with a single vector of type repeated varchar vector. Each record is a single
+ * value within the vector containing all the fields in the record as individual array elements.
+ */
+class RepeatedVarCharOutput extends TextOutput {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedVarCharOutput.class);
+
+  static final String COL_NAME = "columns";
+  static final FieldReference REF = new FieldReference(COL_NAME);
+  static final SchemaPath COLUMNS = SchemaPath.getSimplePath("columns");
+  public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
+
+  // output vector
+  private final RepeatedVarCharVector vector;
+
+  // mutator for the output vector
+  private final RepeatedVarCharVector.Mutator mutator;
+
+  // boolean array indicating which fields are selected (if star query entire array is set to true)
+  private final boolean[] collectedFields;
+
+  // pointer to keep track of the offsets per record
+  private long repeatedOffset;
+
+  // pointer to keep track of the original offsets per record
+  private long repeatedOffsetOriginal;
+
+  // pointer to end of the offset buffer
+  private long repeatedOffsetMax;
+
+  // pointer to the start of the actual data buffer
+  private long characterDataOriginal;
+
+  // pointer to the current location of the data buffer
+  private long characterData;
+
+  // pointer to the end of the data buffer
+  private long characterDataMax;
+
+  // current pointer into the buffer that keeps track of the length of individual fields
+  private long charLengthOffset;
+
+  // pointer to the start of the length buffer
+  private long charLengthOffsetOriginal;
+
+  // pointer to the end of length buffer
+  private long charLengthOffsetMax;
+
+  // pointer to the beginning of the record
+  private long recordStart;
+
+  // total number of records processed (across batches)
+  private long recordCount;
+
+  // number of records processed in this current batch
+  private int batchIndex;
+
+  // current index of the field being processed within the record
+  private int fieldIndex = -1;
+
+  /* boolean to indicate if we are currently appending data to the output vector
+   * Its set to false when we have hit out of memory or we are not interested in
+   * the particular field
+   */
+  private boolean collect;
+
+  // are we currently appending to a field
+  private boolean fieldOpen;
+
+  // maximum number of fields/columns
+  private final int maxField;
+
+  /**
+   * We initialize and add the repeated varchar vector to the record batch in this
+   * constructor. Perform some sanity checks if the selected columns are valid or not.
+   * @param outputMutator  Used to create/modify schema in the record batch
+   * @param columns  List of columns selected in the query
+   * @param isStarQuery  boolean to indicate if all fields are selected or not
+   * @throws SchemaChangeException
+   */
+  public RepeatedVarCharOutput(OutputMutator outputMutator, Collection<SchemaPath> columns, boolean isStarQuery) throws SchemaChangeException {
+    super();
+
+    MaterializedField field = MaterializedField.create(REF, Types.repeated(TypeProtos.MinorType.VARCHAR));
+    this.vector = outputMutator.addField(field, RepeatedVarCharVector.class);
+
+    this.mutator = vector.getMutator();
+
+
+    { // setup fields
+      List<Integer> columnIds = new ArrayList<Integer>();
+      if (!isStarQuery) {
+        String pathStr;
+        for (SchemaPath path : columns) {
+          assert path.getRootSegment().isNamed();
+          pathStr = path.getRootSegment().getPath();
+          Preconditions.checkArgument(pathStr.equals(COL_NAME) || (pathStr.equals("*") && path.getRootSegment().getChild() == null),
+              "Selected column(s) must have name 'columns' or must be plain '*'");
+
+          if (path.getRootSegment().getChild() != null) {
+            Preconditions.checkArgument(path.getRootSegment().getChild().isArray(), "Selected column must be an array index");
+            int index = path.getRootSegment().getChild().getArraySegment().getIndex();
+            columnIds.add(index);
+          }
+        }
+        Collections.sort(columnIds);
+
+      }
+
+      boolean[] fields = new boolean[MAXIMUM_NUMBER_COLUMNS];
+
+      int maxField = fields.length;
+
+      if(isStarQuery){
+        Arrays.fill(fields, true);
+      }else{
+        for(Integer i : columnIds){
+          maxField = 0;
+          maxField = Math.max(maxField, i);
+          fields[i] = true;
+        }
+      }
+      this.collectedFields = fields;
+      this.maxField = maxField;
+    }
+
+
+  }
+
+  /**
+   * Start a new record batch. Resets all the offsets and pointers that
+   * store buffer addresses
+   */
+  public void startBatch() {
+    this.recordStart = characterDataOriginal;
+    this.fieldOpen = false;
+    this.batchIndex = 0;
+    this.fieldIndex = -1;
+    this.collect = true;
+
+    loadRepeatedOffsetAddress();
+    loadVarCharOffsetAddress();
+    loadVarCharDataAddress();
+  }
+
+  private void loadRepeatedOffsetAddress(){
+    DrillBuf buf = vector.getOffsetVector().getBuffer();
+    checkBuf(buf);
+    this.repeatedOffset = buf.memoryAddress() + 4;
+    this.repeatedOffsetOriginal = buf.memoryAddress() + 4;
+    this.repeatedOffsetMax = buf.memoryAddress() + buf.capacity();
+  }
+
+  private void loadVarCharDataAddress(){
+    DrillBuf buf = vector.getValuesVector().getBuffer();
+    checkBuf(buf);
+    this.characterData = buf.memoryAddress();
+    this.characterDataOriginal = buf.memoryAddress();
+    this.characterDataMax = buf.memoryAddress() + buf.capacity();
+  }
+
+  private void loadVarCharOffsetAddress(){
+    DrillBuf buf = vector.getValuesVector().getOffsetVector().getBuffer();
+    checkBuf(buf);
+    this.charLengthOffset = buf.memoryAddress() + 4;
+    this.charLengthOffsetOriginal = buf.memoryAddress() + 4; // add four as offsets conceptually start at 1. (first item is 0..1)
+    this.charLengthOffsetMax = buf.memoryAddress() + buf.capacity();
+  }
+
+  private void expandVarCharOffsets(){
+    vector.getValuesVector().getOffsetVector().reAlloc();
+    long diff = charLengthOffset - charLengthOffsetOriginal;
+    loadVarCharOffsetAddress();
+    charLengthOffset += diff;
+  }
+
+  private void expandVarCharData(){
+    vector.getValuesVector().reAlloc();
+    long diff = characterData - characterDataOriginal;
+    loadVarCharDataAddress();
+    characterData += diff;
+  }
+
+  private void expandRepeatedOffsets(){
+    vector.getOffsetVector().reAlloc();
+    long diff = repeatedOffset - repeatedOffsetOriginal;
+    loadRepeatedOffsetAddress();
+    repeatedOffset += diff;
+  }
+
+  /**
+   * Helper method to check if the buffer we are accessing
+   * has a minimum reference count and has not been deallocated
+   * @param b  working drill buffer
+   */
+  private void checkBuf(DrillBuf b){
+    if(b.refCnt() < 1){
+      throw new IllegalStateException("Cannot access a dereferenced buffer.");
+    }
+  }
+
+  @Override
+  public void startField(int index) {
+    fieldIndex = index;
+    collect = collectedFields[index];
+    fieldOpen = true;
+  }
+
+  @Override
+  public boolean endField() {
+    fieldOpen = false;
+
+    if(charLengthOffset >= charLengthOffsetMax){
+      expandVarCharOffsets();
+    }
+
+    int newOffset = (int) (characterData - characterDataOriginal);
+    PlatformDependent.putInt(charLengthOffset, newOffset);
+    charLengthOffset += 4;
+    return fieldIndex < maxField;
+  }
+
+  @Override
+  public boolean endEmptyField() {
+    return endField();
+  }
+
+  @Override
+  public void append(byte data) {
+    if(!collect){
+      return;
+    }
+
+    if(characterData >= characterDataMax){
+      expandVarCharData();
+    }
+
+    PlatformDependent.putByte(characterData, data);
+    characterData++;
+
+  }
+
+  @Override
+  public long getRecordCount() {
+    return recordCount;
+  }
+
+  @Override
+  public boolean rowHasData() {
+    return this.recordStart < characterData;
+  }
+
+  @Override
+  public void finishRecord() {
+    this.recordStart = characterData;
+
+    if(fieldOpen){
+      endField();
+    }
+
+    if(repeatedOffset >= repeatedOffsetMax){
+      expandRepeatedOffsets();
+    }
+
+    int newOffset = ((int) (charLengthOffset - charLengthOffsetOriginal))/4;
+    PlatformDependent.putInt(repeatedOffset, newOffset);
+    repeatedOffset += 4;
+
+    // if there were no defined fields, skip.
+    if(fieldIndex > -1){
+      batchIndex++;
+      recordCount++;
+    }
+
+
+  }
+
+
+  // Sets the record count in this batch within the value vector
+  @Override
+  public void finishBatch() {
+    mutator.setValueCount(batchIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java
new file mode 100644
index 0000000..ab9ee0d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+class StreamFinishedPseudoException extends RuntimeException {
+
+  public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException();
+
+  private StreamFinishedPseudoException() {
+    super("", null, false, true);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
new file mode 100644
index 0000000..c764f56
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+/*******************************************************************************
+ * Copyright 2014 uniVocity Software Pty Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+import io.netty.buffer.DrillBuf;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import com.google.common.base.Preconditions;
+import com.univocity.parsers.common.Format;
+
+/**
+ * Class that fronts an InputStream to provide a byte consumption interface.
+ * Also manages only reading lines to and from each split.
+ */
+final class TextInput {
+
+  private static final byte NULL_BYTE = (byte) '\0';
+  private final byte lineSeparator1;
+  private final byte lineSeparator2;
+  private final byte normalizedLineSeparator;
+  private final TextParsingSettings settings;
+
+  private long lineCount;
+  private long charCount;
+
+  /**
+   * The starting position in the file.
+   */
+  private final long startPos;
+  private final long endPos;
+
+  private int bufferMark;
+  private long streamMark;
+
+  private long streamPos;
+
+  private final Seekable seekable;
+  private final FSDataInputStream inputFS;
+  private final InputStream input;
+
+  private final DrillBuf buffer;
+  private final ByteBuffer underlyingBuffer;
+  private final long bStart;
+  private final long bStartMinus1;
+
+  private final boolean bufferReadable;
+
+  /**
+   * Whether there was a possible partial line separator on the previous
+   * read so we dropped it and it should be appended to next read.
+   */
+  private boolean remByte = false;
+
+  /**
+   * The current position in the buffer.
+   */
+  public int bufferPtr;
+
+  /**
+   * The quantity of valid data in the buffer.
+   */
+  public int length = -1;
+
+  private boolean endFound = false;
+
+  /**
+   * Creates a new instance with the mandatory characters for handling newlines transparently.
+   * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
+   * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
+   */
+  public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
+    byte[] lineSeparator = settings.getNewLineDelimiter();
+    byte normalizedLineSeparator = settings.getNormalizedNewLine();
+    Preconditions.checkArgument(lineSeparator != null && (lineSeparator.length == 1 || lineSeparator.length == 2), "Invalid line separator. Expected 1 to 2 characters");
+    Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
+    boolean isCompressed = input instanceof CompressionInputStream ;
+    Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
+
+    // splits aren't allowed with compressed data.  The split length will be the compressed size which means we'll normally end prematurely.
+    if(isCompressed && endPos > 0){
+      endPos = Long.MAX_VALUE;
+    }
+
+    this.input = input;
+    this.seekable = (Seekable) input;
+    this.settings = settings;
+
+    if(input instanceof FSDataInputStream){
+      this.inputFS = (FSDataInputStream) input;
+      this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
+    }else{
+      this.inputFS = null;
+      this.bufferReadable = false;
+    }
+
+    this.startPos = startPos;
+    this.endPos = endPos;
+
+    this.lineSeparator1 = lineSeparator[0];
+    this.lineSeparator2 = lineSeparator.length == 2 ? lineSeparator[1] : NULL_BYTE;
+    this.normalizedLineSeparator = normalizedLineSeparator;
+
+    this.buffer = readBuffer;
+    this.bStart = buffer.memoryAddress();
+    this.bStartMinus1 = bStart -1;
+    this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
+  }
+
+  /**
+   * Test the input to position for read start.  If the input is a non-zero split or
+   * splitFirstLine is enabled, input will move to appropriate complete line.
+   * @throws IOException
+   */
+  final void start() throws IOException {
+    lineCount = 0;
+    if(startPos > 0){
+      seekable.seek(startPos);
+    }
+
+    updateBuffer();
+    if (length > 0) {
+      if(startPos > 0 || settings.isSkipFirstLine()){
+
+        // move to next full record.
+        skipLines(1);
+      }
+    }
+  }
+
+
+  /**
+   * Helper method to get the most recent characters consumed since the last record started.
+   * May get an incomplete string since we don't support stream rewind.  Returns empty string for now.
+   * @return String of last few bytes.
+   * @throws IOException
+   */
+  public String getStringSinceMarkForError() throws IOException {
+    return " ";
+  }
+
+  long getPos(){
+    return streamPos + bufferPtr;
+  }
+
+  public void mark(){
+    streamMark = streamPos;
+    bufferMark = bufferPtr;
+  }
+
+  /**
+   * read some more bytes from the stream.  Uses the zero copy interface if available.  Otherwise, does byte copy.
+   * @throws IOException
+   */
+  private final void read() throws IOException {
+    if(bufferReadable){
+
+      if(remByte){
+        underlyingBuffer.put(lineSeparator1);
+        remByte = false;
+      }
+      length = inputFS.read(underlyingBuffer);
+
+    }else{
+
+      byte[] b = new byte[underlyingBuffer.capacity()];
+      if(remByte){
+        b[0] = lineSeparator1;
+        length = input.read(b, 1, b.length - 1);
+        remByte = false;
+      }else{
+        length = input.read(b);
+      }
+
+      underlyingBuffer.put(b);
+    }
+  }
+
+
+  /**
+   * Read more data into the buffer.  Will also manage split end conditions.
+   * @throws IOException
+   */
+  private final void updateBuffer() throws IOException {
+    streamPos = seekable.getPos();
+    underlyingBuffer.clear();
+
+    if(endFound){
+      length = -1;
+      return;
+    }
+
+    read();
+
+    // check our data read allowance.
+    if(streamPos + length >= this.endPos){
+      updateLengthBasedOnConstraint();
+    }
+
+    charCount += bufferPtr;
+    bufferPtr = 1;
+
+    buffer.writerIndex(underlyingBuffer.limit());
+    buffer.readerIndex(underlyingBuffer.position());
+
+  }
+
+  /**
+   * Checks to see if we can go over the end of our bytes constraint on the data.  If so,
+   * adjusts so that we can only read to the last character of the first line that crosses
+   * the split boundary.
+   */
+  private void updateLengthBasedOnConstraint(){
+    // we've run over our alotted data.
+    final byte lineSeparator1 = this.lineSeparator1;
+    final byte lineSeparator2 = this.lineSeparator2;
+
+    // find the next line separator:
+    final long max = bStart + length;
+
+    for(long m = this.bStart + (endPos - streamPos); m < max; m++){
+      if(PlatformDependent.getByte(m) == lineSeparator1){
+        // we found a potential line break.
+
+        if(lineSeparator2 == NULL_BYTE){
+          // we found a line separator and don't need to consult the next byte.
+          length = (int)(m - bStart);
+          endFound = true;
+          break;
+        }else{
+          // this is a two byte line separator.
+
+          long mPlus = m+1;
+          if(mPlus < max){
+            // we can check next byte and see if the second lineSeparator is correct.
+            if(lineSeparator2 == PlatformDependent.getByte(mPlus)){
+              length = (int)(mPlus - bStart);
+              endFound = true;
+              break;
+            }else{
+              // this was a partial line break.
+              continue;
+            }
+          }else{
+            // the last character of the read was a remnant byte.  We'll hold off on dealing with this byte until the next read.
+            remByte = true;
+            length -= 1;
+            break;
+          }
+
+        }
+      }
+    }
+  }
+
+  /**
+   * Get next byte from stream.  Also maintains the current line count.  Will throw a StreamFinishedPseudoException
+   * when the stream has run out of bytes.
+   * @return next byte from stream.
+   * @throws IOException
+   */
+  public final byte nextChar() throws IOException {
+    final byte lineSeparator1 = this.lineSeparator1;
+    final byte lineSeparator2 = this.lineSeparator2;
+
+    if (length == -1) {
+      throw StreamFinishedPseudoException.INSTANCE;
+    }
+
+    if(AssertionUtil.BOUNDS_CHECKING_ENABLED){
+      buffer.checkBytes(bufferPtr - 1, bufferPtr);
+    }
+
+    byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr);
+
+    if (bufferPtr >= length) {
+      if (length != -1) {
+        updateBuffer();
+        bufferPtr--;
+      } else {
+        throw StreamFinishedPseudoException.INSTANCE;
+      }
+    }
+
+    bufferPtr++;
+
+    // monitor for next line.
+    if (lineSeparator1 == byteChar && (lineSeparator2 == NULL_BYTE || lineSeparator2 == buffer.getByte(bufferPtr - 1))) {
+      lineCount++;
+
+      if (lineSeparator2 != NULL_BYTE) {
+        byteChar = normalizedLineSeparator;
+
+        if (bufferPtr >= length) {
+          if (length != -1) {
+            updateBuffer();
+          } else {
+            throw StreamFinishedPseudoException.INSTANCE;
+          }
+        }
+      }
+    }
+    return byteChar;
+  }
+
+  /**
+   * Number of lines read since the start of this split.
+   * @return
+   */
+  public final long lineCount() {
+    return lineCount;
+  }
+
+  /**
+   * Skip forward the number of line delimiters.  If you are in the middle of a line,
+   * a value of 1 will skip to the start of the next record.
+   * @param lines Number of lines to skip.
+   * @throws IOException
+   */
+  public final void skipLines(int lines) throws IOException {
+    if (lines < 1) {
+      return;
+    }
+    long expectedLineCount = this.lineCount + lines;
+
+    try {
+      do {
+        nextChar();
+      } while (lineCount < expectedLineCount);
+      if (lineCount < lines) {
+        throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+      }
+    } catch (EOFException ex) {
+      throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+    }
+  }
+
+  public final long charCount() {
+    return charCount + bufferPtr;
+  }
+
+  public long getLineCount() {
+    return lineCount;
+  }
+
+  public void close() throws IOException{
+    input.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java
new file mode 100644
index 0000000..66b1165
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+/* Base class for producing output record batches while dealing with
+ * Text files.
+ */
+abstract class TextOutput {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextOutput.class);
+
+  /**
+   * Start processing a new field within a record.
+   * @param index  index within the record
+   */
+  public abstract void startField(int index);
+
+  /**
+   * End processing a field within a record.
+   * @return  true if engine should continue processing record.  false if rest of record can be skipped.
+   */
+  public abstract boolean endField();
+
+  /**
+   * Shortcut that lets the output know that we are closing ending a field with no data.
+   * @return true if engine should continue processing record.  false if rest of record can be skipped.
+   */
+  public abstract boolean endEmptyField();
+
+  /**
+   * Add the provided data but drop any whitespace.
+   * @param data
+   */
+  public void appendIgnoringWhitespace(byte data){
+    if(TextReader.isWhite(data)){
+      // noop
+    }else{
+      append(data);
+    }
+  }
+
+  /**
+   * This function appends the byte to the output character data buffer
+   * @param data  current byte read
+   */
+  public abstract void append(byte data);
+
+  /**
+   * Completes the processing of a given record. Also completes the processing of the
+   * last field being read.
+   */
+  public abstract void finishRecord();
+
+  /**
+   *  Return the total number of records (across batches) processed
+   */
+  public abstract long getRecordCount();
+
+  /**
+   * Informs output to setup for new record batch.
+   */
+  public abstract void startBatch();
+
+  /**
+   * Does any final cleanup that is required for closing a batch.  Example might include closing the last field.
+   */
+  public abstract void finishBatch();
+
+  /**
+   * Helper method to check if the current record has any non-empty fields
+   */
+  public abstract boolean rowHasData();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java
new file mode 100644
index 0000000..7b95ee2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import java.io.IOException;
+
+import com.univocity.parsers.common.ParserOutput;
+import com.univocity.parsers.common.ParsingContext;
+import com.univocity.parsers.common.input.CharInputReader;
+
+class TextParsingContext implements ParsingContext {
+
+  private final TextInput input;
+  private final TextOutput output;
+  protected boolean stopped = false;
+
+  private int[] extractedIndexes = null;
+
+  public TextParsingContext(TextInput input, TextOutput output) {
+    this.input = input;
+    this.output = output;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void stop() {
+    stopped = true;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isStopped() {
+    return stopped;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long currentLine() {
+    return input.lineCount();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long currentChar() {
+    return input.charCount();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int currentColumn() {
+    return -1;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String[] headers() {
+    return new String[]{};
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int[] extractedFieldIndexes() {
+    return extractedIndexes;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long currentRecord() {
+    return output.getRecordCount();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String currentParsedContent() {
+    try {
+      return input.getStringSinceMarkForError();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void skipLines(int lines) {
+  }
+
+  @Override
+  public boolean columnsReordered() {
+    return false;
+  }
+}
+


Mime
View raw message