drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [3/9] drill git commit: DRILL-3963: Add Sequence file support.
Date Thu, 05 Nov 2015 05:56:41 GMT
DRILL-3963: Add Sequence file support.

- File Support
- Add test for sequencefile reader in Impersonation test suite.
- Create hadoop based record reader under user proxy.
- Fix impersonation test for sequence file and add one for Avro.

This closes #214


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9edce154
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9edce154
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9edce154

Branch: refs/heads/master
Commit: 9edce154238548da92963e34a041272fa1e2c745
Parents: ef1cb72
Author: Amit Hadke <amit.hadke@gmail.com>
Authored: Wed Oct 21 13:55:40 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Wed Nov 4 20:55:53 2015 -0800

----------------------------------------------------------------------
 .../drill/exec/store/avro/AvroFormatPlugin.java |   5 +-
 .../drill/exec/store/avro/AvroRecordReader.java |  35 +++-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   4 +-
 .../exec/store/easy/json/JSONFormatPlugin.java  |   2 +-
 .../sequencefile/SequenceFileFormatConfig.java  |  52 ++++++
 .../sequencefile/SequenceFileFormatPlugin.java  |  92 ++++++++++
 .../sequencefile/SequenceFileRecordReader.java  | 167 +++++++++++++++++++
 .../exec/store/easy/text/TextFormatPlugin.java  |   2 +-
 .../resources/bootstrap-storage-plugins.json    |   8 +
 .../impersonation/BaseTestImpersonation.java    |  39 ++++-
 .../impersonation/TestImpersonationQueries.java | 106 +++++++-----
 .../sequencefile/TestSequenceFileReader.java    |  51 ++++++
 .../resources/bootstrap-storage-plugins.json    |   4 +
 .../src/test/resources/sequencefiles/simple.seq | Bin 0 -> 207 bytes
 14 files changed, 515 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index d1e7d49..07b99f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -69,8 +69,9 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig>
{
   }
 
   @Override
-  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork
fileWork, List<SchemaPath> columns) throws ExecutionSetupException {
-    return new AvroRecordReader(context, fileWork.getPath(), fileWork.getStart(), fileWork.getLength(),
dfs, columns);
+  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork
fileWork, List<SchemaPath> columns, String userName) throws ExecutionSetupException
{
+    return new AvroRecordReader(context, fileWork.getPath(), fileWork.getStart(), fileWork.getLength(),
dfs, columns,
+      userName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 210ed9d..c12ff1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
@@ -52,12 +53,14 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Stopwatch;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * A RecordReader implementation for Avro data files.
@@ -78,6 +81,9 @@ public class AvroRecordReader extends AbstractRecordReader {
   private OperatorContext operatorContext;
   private FileSystem fs;
 
+  private final String opUserName;
+  private final String queryUserName;
+
   private static final int DEFAULT_BATCH_SIZE = 1000;
 
 
@@ -86,8 +92,9 @@ public class AvroRecordReader extends AbstractRecordReader {
                           final long start,
                           final long length,
                           final FileSystem fileSystem,
-                          final List<SchemaPath> projectedColumns) {
-    this(fragmentContext, inputPath, start, length, fileSystem, projectedColumns, DEFAULT_BATCH_SIZE);
+                          final List<SchemaPath> projectedColumns,
+                          final String userName) {
+    this(fragmentContext, inputPath, start, length, fileSystem, projectedColumns, userName,
DEFAULT_BATCH_SIZE);
   }
 
   public AvroRecordReader(final FragmentContext fragmentContext,
@@ -95,24 +102,42 @@ public class AvroRecordReader extends AbstractRecordReader {
                           final long start,
                           final long length,
                           final FileSystem fileSystem,
-                          List<SchemaPath> projectedColumns, final int defaultBatchSize)
{
+                          List<SchemaPath> projectedColumns,
+                          final String userName,
+                          final int defaultBatchSize) {
 
     hadoop = new Path(inputPath);
     this.start = start;
     this.end = start + length;
     buffer = fragmentContext.getManagedBuffer();
     this.fs = fileSystem;
-
+    this.opUserName = userName;
+    this.queryUserName = fragmentContext.getQueryUserName();
     setColumns(projectedColumns);
   }
 
+  private DataFileReader getReader(final Path hadoop, final FileSystem fs) throws ExecutionSetupException
{
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName,
this.queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<DataFileReader>() {
+        @Override
+        public DataFileReader run() throws Exception {
+          return new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>());
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating avro reader for file: %s", hadoop), e);
+    }
+  }
+
   @Override
   public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException
{
     operatorContext = context;
     writer = new VectorContainerWriter(output);
 
     try {
-      reader = new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>());
+      reader = getReader(hadoop, fs);
       logger.debug("Processing file : {}, start position : {}, end position : {} ", hadoop,
start, end);
       reader.sync(this.start);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 24fa91b..12e00f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -123,7 +123,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig>
implements
   }
 
   public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs,
FileWork fileWork,
-      List<SchemaPath> columns) throws ExecutionSetupException;
+      List<SchemaPath> columns, String userName) throws ExecutionSetupException;
 
   CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException
{
     String partitionDesignator = context.getOptions()
@@ -173,7 +173,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig>
implements
     }
 
     for(FileWork work : scan.getWorkUnits()){
-      readers.add(getRecordReader(context, dfs, work, scan.getColumns()));
+      readers.add(getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName()));
       if (scan.getSelectionRoot() != null) {
         String[] r = Path.getPathWithoutSchemeAndAuthority(new Path(scan.getSelectionRoot())).toString().split("/");
         String[] p = Path.getPathWithoutSchemeAndAuthority(new Path(work.getPath())).toString().split("/");

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 75ad37a..015fcf6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -62,7 +62,7 @@ public class
 
   @Override
   public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork
fileWork,
-      List<SchemaPath> columns) throws ExecutionSetupException {
+      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
     return new JSONRecordReader(context, fileWork.getPath(), dfs, columns);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
new file mode 100644
index 0000000..75e30c5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.List;
+
+@JsonTypeName("sequencefile") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class SequenceFileFormatConfig implements FormatPluginConfig {
+
+  public List<String> extensions;
+
+  @Override
+  public int hashCode() {
+    return (extensions == null)? 0 : extensions.hashCode();
+  }
+
+  public List<String> getExtensions() {
+    return extensions;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    } else if (obj == null) {
+      return false;
+    } else if (getClass() == obj.getClass()) {
+      SequenceFileFormatConfig other = (SequenceFileFormatConfig) obj;
+      return (extensions == null)? (other.extensions == null) : extensions.equals(other.extensions);
+    }
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
new file mode 100644
index 0000000..f5dcb7d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileFormatConfig>
{
+  public SequenceFileFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+                                  StoragePluginConfig storageConfig) {
+    this(name, context, fsConf, storageConfig, new SequenceFileFormatConfig());
+  }
+
+  public SequenceFileFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+                                  StoragePluginConfig storageConfig, SequenceFileFormatConfig
formatConfig) {
+    super(name, context, fsConf, storageConfig, formatConfig,
+      true, false, /* splittable = */ true, /* compressible = */ true,
+      formatConfig.getExtensions(), "sequencefile");
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+    return true;
+  }
+
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath>
columns)
+    throws IOException {
+    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context,
+                                      DrillFileSystem dfs,
+                                      FileWork fileWork,
+                                      List<SchemaPath> columns,
+                                      String userName) throws ExecutionSetupException {
+    final Path path = dfs.makeQualified(new Path(fileWork.getPath()));
+    final FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(),
new String[]{""});
+    return new SequenceFileRecordReader(split, dfs, context.getQueryUserName(), userName);
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+    return 4001;
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws
IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+    throw new UnsupportedOperationException();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
new file mode 100644
index 0000000..24eed8a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.security.PrivilegedExceptionAction;
+
+import com.google.common.base.Stopwatch;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+
+
+public class SequenceFileRecordReader extends AbstractRecordReader {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SequenceFileRecordReader.class);
+
+  private static final int PER_BATCH_RECORD_COUNT = 4096;
+  private static final int PER_BATCH_BYTES = 256*1024;
+
+  private static final MajorType KEY_TYPE = Types.optional(TypeProtos.MinorType.VARBINARY);
+  private static final MajorType VALUE_TYPE = Types.optional(TypeProtos.MinorType.VARBINARY);
+
+  private final SchemaPath keySchema = SchemaPath.getSimplePath("binary_key");
+  private final SchemaPath valueSchema = SchemaPath.getSimplePath("binary_value");
+
+  private NullableVarBinaryVector keyVector;
+  private NullableVarBinaryVector valueVector;
+  private final FileSplit split;
+  private org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable> reader;
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private final DrillFileSystem dfs;
+  private final String queryUserName;
+  private final String opUserName;
+
+  public SequenceFileRecordReader(final FileSplit split,
+                                  final DrillFileSystem dfs,
+                                  final String queryUserName,
+                                  final String opUserName) {
+    final List<SchemaPath> columns = new ArrayList<>();
+    columns.add(keySchema);
+    columns.add(valueSchema);
+    setColumns(columns);
+    this.dfs = dfs;
+    this.split = split;
+    this.queryUserName = queryUserName;
+    this.opUserName = opUserName;
+  }
+
+  @Override
+  protected boolean isSkipQuery() {
+    return false;
+  }
+
+  private org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat,
+    final JobConf jobConf) throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName,
this.queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<org.apache.hadoop.mapred.RecordReader<BytesWritable,
BytesWritable>>() {
+        @Override
+        public org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable>
run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length:
%d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException
{
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    final JobConf jobConf = new JobConf(dfs.getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+    final MaterializedField keyField = MaterializedField.create(keySchema, KEY_TYPE);
+    final MaterializedField valueField = MaterializedField.create(valueSchema, VALUE_TYPE);
+    try {
+      keyVector = output.addField(keyField, NullableVarBinaryVector.class);
+      valueVector = output.addField(valueField, NullableVarBinaryVector.class);
+    } catch (SchemaChangeException sce) {
+      throw new ExecutionSetupException("Error in setting up sequencefile reader.", sce);
+    }
+  }
+
+  @Override
+  public int next() {
+    final Stopwatch watch = new Stopwatch();
+    watch.start();
+    if (keyVector != null) {
+      keyVector.clear();
+      keyVector.allocateNew();
+    }
+    if (valueVector != null) {
+      valueVector.clear();
+      valueVector.allocateNew();
+    }
+    int recordCount = 0;
+    int batchSize = 0;
+    try {
+      while (recordCount < PER_BATCH_RECORD_COUNT && batchSize < PER_BATCH_BYTES
&& reader.next(key, value)) {
+        keyVector.getMutator().setSafe(recordCount, key.getBytes(), 0, key.getLength());
+        valueVector.getMutator().setSafe(recordCount, value.getBytes(), 0, value.getLength());
+        batchSize += (key.getLength() + value.getLength());
+        ++recordCount;
+      }
+      keyVector.getMutator().setValueCount(recordCount);
+      valueVector.getMutator().setValueCount(recordCount);
+      logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
+      return recordCount;
+    } catch (IOException ioe) {
+      close();
+      throw UserException.dataReadError(ioe).addContext("File Path", split.getPath().toString()).build(logger);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (IOException e) {
+      logger.warn("Exception closing reader: {}", e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 967d920..d40e1fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -78,7 +78,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
 
   @Override
   public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork
fileWork,
-      List<SchemaPath> columns) throws ExecutionSetupException {
+      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
     Path path = dfs.makeQualified(new Path(fileWork.getPath()));
     FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new
String[]{""});
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 1801f1c..8d405d0 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -38,6 +38,10 @@
         },
         "avro" : {
           type: "avro"
+        },
+        "sequencefile": {
+          type : "sequencefile",
+          extensions: [ "seq" ]
         }
       }
     },
@@ -65,6 +69,10 @@
         },
         "avro" : {
           type: "avro"
+        },
+        "sequencefile": {
+          type : "sequencefile",
+          extensions: [ "seq" ]
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
index e6d93e9..0d30f3f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java
@@ -24,10 +24,12 @@ import org.apache.commons.io.FileUtils;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.dotdrill.DotDrillType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -38,6 +40,8 @@ import java.io.File;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.junit.Assert.assertEquals;
+
 public class BaseTestImpersonation extends PlanTestBase {
   protected static final String MINIDFS_STORAGE_PLUGIN_NAME = "miniDfsPlugin";
   protected static final String processUser = System.getProperty("user.name");
@@ -152,4 +156,37 @@ public class BaseTestImpersonation extends PlanTestBase {
       FileUtils.deleteQuietly(new File(miniDfsStoragePath));
     }
   }
-}
+
+  // Return the user workspace for given user.
+  protected static String getWSSchema(String user) {
+    return MINIDFS_STORAGE_PLUGIN_NAME + "." + user;
+  }
+
+  protected static String getUserHome(String user) {
+    return "/user/" + user;
+  }
+
+  protected static void createView(final String viewOwner, final String viewGroup, final
short viewPerms,
+                                 final String newViewName, final String fromSourceSchema,
final String fromSourceTableName) throws Exception {
+    updateClient(viewOwner);
+    test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY,
viewPerms));
+    test(String.format("CREATE VIEW %s.%s AS SELECT * FROM %s.%s;",
+      getWSSchema(viewOwner), newViewName, fromSourceSchema, fromSourceTableName));
+
+    // Verify the view file created has the expected permissions and ownership
+    Path viewFilePath = new Path(getUserHome(viewOwner), newViewName + DotDrillType.VIEW.getEnding());
+    FileStatus status = fs.getFileStatus(viewFilePath);
+    assertEquals(viewGroup, status.getGroup());
+    assertEquals(viewOwner, status.getOwner());
+    assertEquals(viewPerms, status.getPermission().toShort());
+  }
+
+  protected static void createView(final String viewOwner, final String viewGroup, final
String viewName,
+                                 final String viewDef) throws Exception {
+    updateClient(viewOwner);
+    test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY,
(short) 0750));
+    test("CREATE VIEW %s.%s.%s AS %s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", viewName, viewDef);
+    final Path viewFilePath = new Path("/tmp/", viewName + DotDrillType.VIEW.getEnding());
+    fs.setOwner(viewFilePath, viewOwner, viewGroup);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
index f00dc55..6709e43 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -19,10 +19,9 @@ package org.apache.drill.exec.impersonation;
 
 import com.google.common.collect.Maps;
 import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.dotdrill.DotDrillType;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.store.avro.AvroTestUtil;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -33,9 +32,9 @@ import org.junit.Test;
 import java.util.Map;
 
 import static org.hamcrest.core.StringContains.containsString;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertNull;
 
 /**
  * Test queries involving direct impersonation and multilevel impersonation including join
queries where each side is
@@ -63,15 +62,7 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
 
     createNestedTestViewsOnLineItem();
     createNestedTestViewsOnOrders();
-  }
-
-  private static String getUserHome(String user) {
-    return "/user/" + user;
-  }
-
-  // Return the user workspace for given user.
-  private static String getWSSchema(String user) {
-    return MINIDFS_STORAGE_PLUGIN_NAME + "." + user;
+    createRecordReadersData(org1Users[0], org1Groups[0]);
   }
 
   private static Map<String, WorkspaceConfig> createTestWorkspaces() throws Exception
{
@@ -83,14 +74,14 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
     Map<String, WorkspaceConfig> workspaces = Maps.newHashMap();
 
     // create user directory (ex. "/user/user0_1", with ownership "user0_1:group0_1" and
perms 755) for every user.
-    for(int i=0; i<org1Users.length; i++) {
+    for (int i = 0; i < org1Users.length; i++) {
       final String user = org1Users[i];
       final String group = org1Groups[i];
       createAndAddWorkspace(user, getUserHome(user), (short)0755, user, group, workspaces);
     }
 
     // create user directory (ex. "/user/user0_2", with ownership "user0_2:group0_2" and
perms 755) for every user.
-    for(int i=0; i<org2Users.length; i++) {
+    for (int i = 0; i < org2Users.length; i++) {
       final String user = org2Users[i];
       final String group = org2Groups[i];
       createAndAddWorkspace(user, getUserHome(user), (short)0755, user, group, workspaces);
@@ -122,11 +113,11 @@ public class TestImpersonationQueries extends BaseTestImpersonation
{
 
     // Create a view on top of u1_lineitem view
     // /user/user2_1    u2_lineitem    750    user2_1:group2_1
-    createView(org1Users[2], org1Groups[2], (short)0750, "u2_lineitem", getWSSchema(org1Users[1]),
"u1_lineitem");
+    createView(org1Users[2], org1Groups[2], (short) 0750, "u2_lineitem", getWSSchema(org1Users[1]),
"u1_lineitem");
 
     // Create a view on top of u2_lineitem view
     // /user/user2_1    u22_lineitem    750    user2_1:group2_1
-    createView(org1Users[2], org1Groups[2], (short)0750, "u22_lineitem", getWSSchema(org1Users[2]),
"u2_lineitem");
+    createView(org1Users[2], org1Groups[2], (short) 0750, "u22_lineitem", getWSSchema(org1Users[2]),
"u2_lineitem");
 
     // Create a view on top of u22_lineitem view
     // /user/user3_1    u3_lineitem    750    user3_1:group3_1
@@ -134,7 +125,7 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
 
     // Create a view on top of u3_lineitem view
     // /user/user4_1    u4_lineitem    755    user4_1:group4_1
-    createView(org1Users[4], org1Groups[4], (short)0755, "u4_lineitem", getWSSchema(org1Users[3]),
"u3_lineitem");
+    createView(org1Users[4], org1Groups[4], (short) 0755, "u4_lineitem", getWSSchema(org1Users[3]),
"u3_lineitem");
   }
 
   private static void createNestedTestViewsOnOrders() throws Exception {
@@ -164,19 +155,20 @@ public class TestImpersonationQueries extends BaseTestImpersonation
{
     createView(org2Users[4], org2Groups[4], (short)0755, "u4_orders", getWSSchema(org2Users[3]),
"u3_orders");
   }
 
-  private static void createView(final String viewOwner, final String viewGroup, final short
viewPerms,
-      final String newViewName, final String fromSourceSchema, final String fromSourceTableName)
throws Exception {
-    updateClient(viewOwner);
-    test(String.format("ALTER SESSION SET `%s`='%o';", ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY,
viewPerms));
-    test(String.format("CREATE VIEW %s.%s AS SELECT * FROM %s.%s;",
-        getWSSchema(viewOwner), newViewName, fromSourceSchema, fromSourceTableName));
-
-    // Verify the view file created has the expected permissions and ownership
-    Path viewFilePath = new Path(getUserHome(viewOwner), newViewName + DotDrillType.VIEW.getEnding());
-    FileStatus status = fs.getFileStatus(viewFilePath);
-    assertEquals(viewGroup, status.getGroup());
-    assertEquals(viewOwner, status.getOwner());
-    assertEquals(viewPerms, status.getPermission().toShort());
+  private static void createRecordReadersData(String user, String group) throws Exception
{
+    // copy sequence file
+    updateClient(user);
+    Path localFile = new Path(FileUtils.getResourceAsFile("/sequencefiles/simple.seq").toURI().toString());
+    Path dfsFile = new Path(getUserHome(user), "simple.seq");
+    fs.copyFromLocalFile(localFile, dfsFile);
+    fs.setOwner(dfsFile, user, group);
+    fs.setPermission(dfsFile, new FsPermission((short) 0700));
+
+    localFile = new Path(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues());
+    dfsFile = new Path(getUserHome(user), "simple.avro");
+    fs.copyFromLocalFile(localFile, dfsFile);
+    fs.setOwner(dfsFile, user, group);
+    fs.setPermission(dfsFile, new FsPermission((short) 0700));
   }
 
   @Test
@@ -209,11 +201,10 @@ public class TestImpersonationQueries extends BaseTestImpersonation
{
 
     assertNotNull("UserRemoteException is expected", ex);
     assertThat(ex.getMessage(), containsString("PERMISSION ERROR: " +
-            String.format("Not authorized to read table [lineitem] in schema [%s.user0_1]",
-                MINIDFS_STORAGE_PLUGIN_NAME)));
+      String.format("Not authorized to read table [lineitem] in schema [%s.user0_1]",
+        MINIDFS_STORAGE_PLUGIN_NAME)));
   }
 
-
   @Test
   public void testMultiLevelImpersonationEqualToMaxUserHops() throws Exception {
     updateClient(org1Users[4]);
@@ -227,21 +218,21 @@ public class TestImpersonationQueries extends BaseTestImpersonation
{
     try {
       updateClient(org1Users[5]);
       test(String.format("SELECT * from %s.u4_lineitem LIMIT 1;", getWSSchema(org1Users[4])));
-    } catch(UserRemoteException e) {
+    } catch (UserRemoteException e) {
       ex = e;
     }
 
     assertNotNull("UserRemoteException is expected", ex);
     assertThat(ex.getMessage(),
-        containsString("Cannot issue token for view expansion as issuing the token exceeds
the maximum allowed number " +
-            "of user hops (3) in chained impersonation"));
+      containsString("Cannot issue token for view expansion as issuing the token exceeds
the maximum allowed number " +
+        "of user hops (3) in chained impersonation"));
   }
 
   @Test
   public void testMultiLevelImpersonationJoinEachSideReachesMaxUserHops() throws Exception
{
     updateClient(org1Users[4]);
     test(String.format("SELECT * from %s.u4_lineitem l JOIN %s.u3_orders o ON l.l_orderkey
= o.o_orderkey LIMIT 1;",
-        getWSSchema(org1Users[4]), getWSSchema(org2Users[3])));
+      getWSSchema(org1Users[4]), getWSSchema(org2Users[3])));
   }
 
   @Test
@@ -258,8 +249,43 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
 
     assertNotNull("UserRemoteException is expected", ex);
     assertThat(ex.getMessage(),
-        containsString("Cannot issue token for view expansion as issuing the token exceeds
the maximum allowed number " +
-            "of user hops (3) in chained impersonation"));
+      containsString("Cannot issue token for view expansion as issuing the token exceeds
the maximum allowed number " +
+        "of user hops (3) in chained impersonation"));
+  }
+
+  @Test
+  public void sequenceFileChainedImpersonationWithView() throws Exception {
+    // create a view named "simple_seq_view" on "simple.seq". View is owned by user0:group0
and has permissions 750
+    createView(org1Users[0], org1Groups[0], "simple_seq_view",
+      String.format("SELECT convert_from(t.binary_key, 'UTF8') as k FROM %s.`%s` t", MINIDFS_STORAGE_PLUGIN_NAME,
+        new Path(getUserHome(org1Users[0]), "simple.seq")));
+    try {
+      updateClient(org1Users[1]);
+      test("SELECT k FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view");
+    } catch (UserRemoteException e) {
+      assertNull("This test should pass.", e);
+    }
+    createView(org1Users[1], org1Groups[1], "simple_seq_view_2",
+      String.format("SELECT k FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view"));
+    try {
+      updateClient(org1Users[2]);
+      test("SELECT k FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_seq_view_2");
+    } catch (UserRemoteException e) {
+      assertNull("This test should pass.", e);
+    }
+  }
+
+  @Test
+  public void avroChainedImpersonationWithView() throws Exception {
+    createView(org1Users[0], org1Groups[0], "simple_avro_view",
+      String.format("SELECT h_boolean, e_double FROM %s.`%s` t", MINIDFS_STORAGE_PLUGIN_NAME,
+        new Path(getUserHome(org1Users[0]), "simple.avro")));
+    try {
+      updateClient(org1Users[1]);
+      test("SELECT h_boolean FROM %s.%s.%s", MINIDFS_STORAGE_PLUGIN_NAME, "tmp", "simple_avro_view");
+    } catch (UserRemoteException e) {
+      assertNull("This test should pass.", e);
+    }
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
new file mode 100644
index 0000000..f48eb2c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sequencefile;
+
+import java.io.DataOutputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.junit.Test;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.hadoop.io.BytesWritable;
+
+public class TestSequenceFileReader extends BaseTestQuery {
+
+  public static String byteWritableString(String input) throws Exception {
+    final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(bout);
+    final BytesWritable writable = new BytesWritable(input.getBytes("UTF-8"));
+    writable.write(out);
+    return new String(bout.toByteArray());
+  }
+
+  @Test
+  public void testSequenceFileReader() throws Exception {
+    String root = FileUtils.getResourceAsFile("/sequencefiles/simple.seq").toURI().toString();
+    final String query = String.format("select convert_from(t.binary_key, 'UTF8') as k, convert_from(t.binary_value,
'UTF8') as v " +
+      "from dfs_test.`%s` t", root);
+    testBuilder()
+      .sqlQuery(query)
+      .ordered()
+      .baselineColumns("k", "v")
+      .baselineValues(byteWritableString("key0"), byteWritableString("value0"))
+      .baselineValues(byteWritableString("key1"), byteWritableString("value1"))
+      .build().run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index 452baaf..57b9fbb 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -47,6 +47,10 @@
         },
         "avro" : {
           type: "avro"
+        },
+        "sequencefile": {
+          type: "sequencefile",
+          extensions: [ "seq" ]
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/9edce154/exec/java-exec/src/test/resources/sequencefiles/simple.seq
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/sequencefiles/simple.seq b/exec/java-exec/src/test/resources/sequencefiles/simple.seq
new file mode 100644
index 0000000..db37061
Binary files /dev/null and b/exec/java-exec/src/test/resources/sequencefiles/simple.seq differ


Mime
View raw message