drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [2/4] drill git commit: DRILL-2514: Add support for impersonation in FileSystem storage plugin.
Date Tue, 21 Apr 2015 22:21:37 GMT
DRILL-2514: Add support for impersonation in FileSystem storage plugin.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/40c90403
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/40c90403
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/40c90403

Branch: refs/heads/master
Commit: 40c90403255d55b412efe2c3a78289bd75325119
Parents: 117b749
Author: vkorukanti <venki.korukanti@gmail.com>
Authored: Wed Mar 18 15:51:44 2015 -0700
Committer: vkorukanti <venki.korukanti@gmail.com>
Committed: Tue Apr 21 13:16:00 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/store/hbase/HBaseGroupScan.java  |  17 +-
 .../store/hbase/HBasePushFilterIntoScan.java    |   3 +-
 .../exec/store/hbase/HBaseSchemaFactory.java    |   4 +-
 .../exec/store/hbase/HBaseStoragePlugin.java    |  10 +-
 .../drill/exec/store/hbase/HBaseSubScan.java    |   7 +-
 .../HivePushPartitionFilterIntoScan.java        |   2 +-
 .../apache/drill/exec/store/hive/HiveScan.java  |  11 +-
 .../exec/store/hive/HiveStoragePlugin.java      |  10 +-
 .../drill/exec/store/hive/HiveSubScan.java      |   6 +-
 .../store/hive/schema/HiveSchemaFactory.java    |   4 +-
 .../drill/exec/store/mongo/MongoGroupScan.java  |  13 +-
 .../store/mongo/MongoPushDownFilterForScan.java |   2 +-
 .../exec/store/mongo/MongoStoragePlugin.java    |  15 +-
 .../drill/exec/store/mongo/MongoSubScan.java    |   7 +-
 .../store/mongo/schema/MongoSchemaFactory.java  |   6 +-
 .../src/resources/drill-override-example.conf   |   4 +
 .../org/apache/drill/exec/ExecConstants.java    |   6 +
 .../drill/exec/dotdrill/DotDrillFile.java       |  10 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  30 +++-
 .../org/apache/drill/exec/ops/QueryContext.java |  61 ++++++-
 .../drill/exec/ops/ViewExpansionContext.java    | 175 +++++++++++++++++++
 .../apache/drill/exec/opt/BasicOptimizer.java   |   8 +-
 .../drill/exec/physical/base/AbstractBase.java  |  20 +++
 .../physical/base/AbstractFileGroupScan.java    |   7 +
 .../exec/physical/base/AbstractGroupScan.java   |   8 +
 .../exec/physical/base/AbstractSubScan.java     |   4 +
 .../exec/physical/base/PhysicalOperator.java    |   8 +
 .../drill/exec/physical/impl/ImplCreator.java   | 128 +++++++++-----
 .../drill/exec/planner/logical/DrillTable.java  |  39 ++++-
 .../exec/planner/logical/DrillViewTable.java    |  37 ++--
 .../exec/planner/logical/DynamicDrillTable.java |   9 +
 .../drill/exec/planner/sql/DrillSqlWorker.java  |   3 +
 .../sql/handlers/CreateTableHandler.java        |   4 +-
 .../exec/planner/torel/ConversionContext.java   |   6 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../drill/exec/store/AbstractStoragePlugin.java |   6 +-
 .../apache/drill/exec/store/SchemaConfig.java   |  93 ++++++++++
 .../apache/drill/exec/store/SchemaFactory.java  |  14 +-
 .../apache/drill/exec/store/StoragePlugin.java  |  18 +-
 .../drill/exec/store/StoragePluginRegistry.java |   6 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java  |  12 +-
 .../exec/store/dfs/FileSystemSchemaFactory.java |  11 +-
 .../drill/exec/store/dfs/FormatPlugin.java      |   4 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  85 ++++++---
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  14 +-
 .../exec/store/dfs/easy/EasyGroupScan.java      |  17 +-
 .../drill/exec/store/dfs/easy/EasySubScan.java  |   7 +-
 .../exec/store/direct/DirectGroupScan.java      |   2 +-
 .../drill/exec/store/direct/DirectSubScan.java  |   2 +-
 .../exec/store/easy/text/TextFormatPlugin.java  |   5 +-
 .../exec/store/ischema/InfoSchemaGroupScan.java |   2 +
 .../store/ischema/InfoSchemaStoragePlugin.java  |   7 +-
 .../exec/store/ischema/InfoSchemaSubScan.java   |   1 +
 .../exec/store/ischema/RecordGenerator.java     |   8 +-
 .../drill/exec/store/mock/MockGroupScanPOP.java |   2 +-
 .../exec/store/mock/MockStorageEngine.java      |   7 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |  11 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  89 ++++++----
 .../exec/store/parquet/ParquetRowGroupScan.java |  12 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   8 +-
 .../drill/exec/store/sys/SystemTablePlugin.java |   7 +-
 .../drill/exec/store/sys/SystemTableScan.java   |   2 +
 .../drill/exec/util/ImpersonationUtil.java      | 162 +++++++++++++++++
 .../apache/drill/exec/work/foreman/Foreman.java |   2 +-
 .../exec/work/fragment/FragmentExecutor.java    |  27 ++-
 .../src/main/resources/drill-module.conf        |   4 +
 .../java/org/apache/drill/PlanningBase.java     |   3 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   2 +-
 .../exec/testing/TestExceptionInjection.java    |  11 +-
 .../drill/exec/testing/TestPauseInjection.java  |   6 +-
 pom.xml                                         |   2 +-
 71 files changed, 1097 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 6d18d12..e52e2e4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -101,14 +101,17 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
   private long scanSizeInBytes = 0;
 
   @JsonCreator
-  public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
+  public HBaseGroupScan(@JsonProperty("userName") String userName,
+                        @JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
                         @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
                         @JsonProperty("columns") List<SchemaPath> columns,
                         @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
-    this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
+    this (userName, (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
   }
 
-  public HBaseGroupScan(HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
+  public HBaseGroupScan(String userName, HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec,
+      List<SchemaPath> columns) {
+    super(userName);
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
     this.hbaseScanSpec = scanSpec;
@@ -121,6 +124,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
    * @param that The HBaseGroupScan to clone
    */
   private HBaseGroupScan(HBaseGroupScan that) {
+    super(that);
     this.columns = that.columns;
     this.hbaseScanSpec = that.hbaseScanSpec;
     this.endpointFragmentMapping = that.endpointFragmentMapping;
@@ -342,7 +346,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     assert minorFragmentId < endpointFragmentMapping.size() : String.format(
         "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
         minorFragmentId);
-    return new HBaseSubScan(storagePlugin, storagePluginConfig, endpointFragmentMapping.get(minorFragmentId), columns);
+    return new HBaseSubScan(getUserName(), storagePlugin, storagePluginConfig,
+        endpointFragmentMapping.get(minorFragmentId), columns);
   }
 
   @Override
@@ -427,7 +432,9 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
    * Empty constructor, do not use, only for testing.
    */
   @VisibleForTesting
-  public HBaseGroupScan() { }
+  public HBaseGroupScan() {
+    super((String)null);
+  }
 
   /**
    * Do not use, only for testing.

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index 2b419d4..c395b43 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -62,7 +62,8 @@ public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {
       return; //no filter pushdown ==> No transformation.
     }
 
-    final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
+    final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+        newScanSpec, groupScan.getColumns());
     newGroupsScan.setFilterPushedDown(true);
 
     final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 1c407e1..47d08b4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -25,8 +25,8 @@ import net.hydromatic.optiq.Schema;
 import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.Table;
 
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -46,7 +46,7 @@ public class HBaseSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     HBaseSchema schema = new HBaseSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index 948d462..2214c50 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -23,9 +23,9 @@ import java.util.Set;
 import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.common.JSONOptions;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -60,14 +60,14 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public HBaseGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
+  public HBaseGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     HBaseScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<HBaseScanSpec>() {});
-    return new HBaseGroupScan(this, scanSpec, null);
+    return new HBaseGroupScan(userName, this, scanSpec, null);
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(session, parent);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 23d8c5a..96ae257 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -55,17 +55,20 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
 
   @JsonCreator
   public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
+                      @JsonProperty("userName") String userName,
                       @JsonProperty("storage") StoragePluginConfig storage,
                       @JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList,
                       @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+    super(userName);
     hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
     this.regionScanSpecList = regionScanSpecList;
     this.storage = (HBaseStoragePluginConfig) storage;
     this.columns = columns;
   }
 
-  public HBaseSubScan(HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
+  public HBaseSubScan(String userName, HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
       List<HBaseSubScanSpec> regionInfoList, List<SchemaPath> columns) {
+    super(userName);
     hbaseStoragePlugin = plugin;
     storage = config;
     this.regionScanSpecList = regionInfoList;
@@ -103,7 +106,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HBaseSubScan(hbaseStoragePlugin, storage, regionScanSpecList, columns);
+    return new HBaseSubScan(getUserName(), hbaseStoragePlugin, storage, regionScanSpecList, columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
index 374c486..9b93ac0 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java
@@ -132,7 +132,7 @@ public abstract class HivePushPartitionFilterIntoScan extends StoragePluginOptim
 
     try {
       HiveScan oldScan = (HiveScan) scanRel.getGroupScan();
-      HiveScan hiveScan = new HiveScan(newReadEntry, oldScan.storagePlugin, oldScan.columns);
+      HiveScan hiveScan = new HiveScan(oldScan.getUserName(), newReadEntry, oldScan.storagePlugin, oldScan.columns);
       PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, hiveScan, builder);
     } catch (ExecutionSetupException e) {
       throw new DrillRuntimeException(e);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 92635a8..8a2e498 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -94,10 +94,12 @@ public class HiveScan extends AbstractGroupScan {
   private long rowCount = 0;
 
   @JsonCreator
-  public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
+  public HiveScan(@JsonProperty("userName") final String userName,
+                  @JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
                   @JsonProperty("storage-plugin") final String storagePluginName,
                   @JsonProperty("columns") final List<SchemaPath> columns,
                   @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+    super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.storagePluginName = storagePluginName;
     this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
@@ -106,7 +108,8 @@ public class HiveScan extends AbstractGroupScan {
     endpoints = storagePlugin.getContext().getBits();
   }
 
-  public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
+  public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
+    super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.storagePlugin = storagePlugin;
@@ -116,6 +119,7 @@ public class HiveScan extends AbstractGroupScan {
   }
 
   private HiveScan(final HiveScan that) {
+    super(that);
     this.columns = that.columns;
     this.endpoints = that.endpoints;
     this.hiveReadEntry = that.hiveReadEntry;
@@ -226,8 +230,9 @@ public class HiveScan extends AbstractGroupScan {
       if (parts.contains(null)) {
         parts = null;
       }
+
       final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
-      return new HiveSubScan(encodedInputSplits, subEntry, splitTypes, columns);
+      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 91e7a92..a19ebb8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -29,9 +29,9 @@ import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
 
@@ -67,7 +67,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public HiveScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
     HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
     try {
       if (hiveReadEntry.getJdbcTableType() == TableType.VIEW) {
@@ -75,15 +75,15 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
             "Querying views created in Hive from Drill is not supported in current version.");
       }
 
-      return new HiveScan(hiveReadEntry, this, columns);
+      return new HiveScan(userName, hiveReadEntry, this, columns);
     } catch (ExecutionSetupException e) {
       throw new IOException(e);
     }
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(session, parent);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
   }
   public Set<StoragePluginOptimizerRule> getOptimizerRules() {
     return ImmutableSet.of(HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_PROJECT, HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_SCAN);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 1233202..2181c2a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -62,10 +62,12 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   private List<Partition> partitions;
 
   @JsonCreator
-  public HiveSubScan(@JsonProperty("splits") List<String> splits,
+  public HiveSubScan(@JsonProperty("userName") String userName,
+                     @JsonProperty("splits") List<String> splits,
                      @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
                      @JsonProperty("splitClasses") List<String> splitClasses,
                      @JsonProperty("columns") List<SchemaPath> columns) throws IOException, ReflectiveOperationException {
+    super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.table = hiveReadEntry.getTable();
     this.partitions = hiveReadEntry.getPartitions();
@@ -126,7 +128,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     try {
-      return new HiveSubScan(splits, hiveReadEntry, splitClasses, columns);
+      return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 587e90d..ec30f01 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -30,8 +30,8 @@ import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveStoragePlugin;
@@ -187,7 +187,7 @@ public class HiveSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     HiveSchema schema = new HiveSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index b086786..54d34f9 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -115,17 +115,20 @@ public class MongoGroupScan extends AbstractGroupScan implements
   private boolean filterPushedDown = false;
 
   @JsonCreator
-  public MongoGroupScan(@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
+  public MongoGroupScan(
+      @JsonProperty("userName") String userName,
+      @JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
       @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
       @JsonProperty("columns") List<SchemaPath> columns,
       @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException,
       ExecutionSetupException {
-    this((MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
+    this(userName, (MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
         scanSpec, columns);
   }
 
-  public MongoGroupScan(MongoStoragePlugin storagePlugin,
+  public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
       MongoScanSpec scanSpec, List<SchemaPath> columns) throws IOException {
+    super(userName);
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
     this.scanSpec = scanSpec;
@@ -140,6 +143,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
    *          The MongoGroupScan to clone
    */
   private MongoGroupScan(MongoGroupScan that) {
+    super(that);
     this.scanSpec = that.scanSpec;
     this.columns = that.columns;
     this.storagePlugin = that.storagePlugin;
@@ -446,7 +450,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
   @Override
   public MongoSubScan getSpecificScan(int minorFragmentId)
       throws ExecutionSetupException {
-    return new MongoSubScan(storagePlugin, storagePluginConfig,
+    return new MongoSubScan(getUserName(), storagePlugin, storagePluginConfig,
         endpointFragmentMapping.get(minorFragmentId), columns);
   }
 
@@ -554,6 +558,7 @@ public class MongoGroupScan extends AbstractGroupScan implements
 
   @VisibleForTesting
   MongoGroupScan() {
+    super((String)null);
   }
 
   @JsonIgnore

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
index 9af49b1..1d3b292 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
@@ -68,7 +68,7 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
 
     MongoGroupScan newGroupsScan = null;
     try {
-      newGroupsScan = new MongoGroupScan(groupScan.getStoragePlugin(),
+      newGroupsScan = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
           newScanSpec, groupScan.getColumns());
     } catch (IOException e) {
       logger.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index dfad5ef..d291325 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -25,9 +25,9 @@ import net.hydromatic.optiq.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
 import org.slf4j.Logger;
@@ -63,8 +63,8 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(session, parent);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
   @Override
@@ -73,12 +73,9 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection)
-      throws IOException {
-    MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(),
-        new TypeReference<MongoScanSpec>() {
-        });
-    return new MongoGroupScan(this, mongoScanSpec, null);
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<MongoScanSpec>() {});
+    return new MongoGroupScan(userName, this, mongoScanSpec, null);
   }
 
   public Set<StoragePluginOptimizerRule> getOptimizerRules() {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 36008cf..fb6e095 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -57,10 +57,12 @@ public class MongoSubScan extends AbstractBase implements SubScan {
   @JsonCreator
   public MongoSubScan(
       @JacksonInject StoragePluginRegistry registry,
+      @JsonProperty("userName") String userName,
       @JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
       @JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList,
       @JsonProperty("columns") List<SchemaPath> columns)
       throws ExecutionSetupException {
+    super(userName);
     this.columns = columns;
     this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig;
     this.mongoStoragePlugin = (MongoStoragePlugin) registry
@@ -68,9 +70,10 @@ public class MongoSubScan extends AbstractBase implements SubScan {
     this.chunkScanSpecList = chunkScanSpecList;
   }
 
-  public MongoSubScan(MongoStoragePlugin storagePlugin,
+  public MongoSubScan(String userName, MongoStoragePlugin storagePlugin,
       MongoStoragePluginConfig storagePluginConfig,
       List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+    super(userName);
     this.mongoStoragePlugin = storagePlugin;
     this.mongoPluginConfig = storagePluginConfig;
     this.columns = columns;
@@ -105,7 +108,7 @@ public class MongoSubScan extends AbstractBase implements SubScan {
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    return new MongoSubScan(mongoStoragePlugin, mongoPluginConfig,
+    return new MongoSubScan(getUserName(), mongoStoragePlugin, mongoPluginConfig,
         chunkScanSpecList, columns);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index f650ccc..c941176 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -34,8 +34,8 @@ import net.hydromatic.optiq.SchemaPlus;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.mongo.MongoCnxnManager;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
@@ -120,7 +120,7 @@ public class MongoSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     MongoSchema schema = new MongoSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);
@@ -188,7 +188,7 @@ public class MongoSchemaFactory implements SchemaFactory {
 
     DrillTable getDrillTable(String dbName, String collectionName) {
       MongoScanSpec mongoScanSpec = new MongoScanSpec(dbName, collectionName);
-      return new DynamicDrillTable(plugin, schemaName, mongoScanSpec);
+      return new DynamicDrillTable(plugin, schemaName, null, mongoScanSpec);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index 943d644..805d6e9 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -106,6 +106,10 @@ drill.exec: {
       write: true
     }
   },
+  impersonation: {
+    enabled: false,
+    max_chained_user_hops: 3
+  },
   security.user.auth {
     enabled: false,
     packages += "org.apache.drill.exec.rpc.user.security",

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index f7648b1..bafbbc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -75,6 +75,8 @@ public interface ExecConstants {
   public static final String SYS_STORE_PROVIDER_CLASS = "drill.exec.sys.store.provider.class";
   public static final String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path";
   public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
+  public static final String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
+  public static final String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
   public static final String USER_AUTHENTICATOR_IMPL_PACKAGES = "drill.exec.security.user.auth.packages";
   public static final String USER_AUTHENTICATION_ENABLED = "drill.exec.security.user.auth.enabled";
   public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
@@ -220,4 +222,8 @@ public interface ExecConstants {
   public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls";
   public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR =
     new ExecutionControls.ControlsOptionValidator(DRILLBIT_CONTROL_INJECTIONS, ExecutionControls.DEFAULT_CONTROLS, 1);
+
+  public static final String NEW_VIEW_DEFAULT_PERMS_KEY = "new_view_default_permissions";
+  public static final OptionValidator NEW_VIEW_DEFAULT_PERMS_VALIDATOR =
+      new StringValidator(NEW_VIEW_DEFAULT_PERMS_KEY, "700");
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
index f9a8ff5..efa8cc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillFile.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus;
 
 import com.google.common.base.Preconditions;
 
+import java.io.IOException;
 import java.io.InputStream;
 
 public class DotDrillFile {
@@ -51,6 +52,13 @@ public class DotDrillFile {
   }
 
   /**
+   * @return Return owner of the file in underlying file system.
+   */
+  public String getOwner() {
+    return status.getOwner();
+  }
+
+  /**
    * Return base file name without the parent directory and extensions.
    * @return Base file name.
    */
@@ -59,7 +67,7 @@ public class DotDrillFile {
     return fileName.substring(0, fileName.lastIndexOf(type.getEnding()));
   }
 
-  public View getView(DrillConfig config) throws Exception{
+  public View getView(DrillConfig config) throws IOException {
     Preconditions.checkArgument(type == DotDrillType.VIEW);
     try(InputStream is = fs.open(status.getPath())){
       return config.getMapper().readValue(is, View.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 7dfd0e6..c566a10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -28,6 +28,7 @@ import net.hydromatic.optiq.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
@@ -48,7 +49,9 @@ import org.apache.drill.exec.server.options.FragmentOptionManager;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -201,7 +204,18 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
       return null;
     }
 
-    return queryContext.getRootSchema();
+    final boolean isImpersonationEnabled = isImpersonationEnabled();
+    // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for
+    // InfoSchema purpose we want to show tables the user has permissions to list or query. If  impersonation is
+    // disabled view the schema as Drillbit process user and throw authorization errors to client.
+    SchemaConfig schemaConfig = SchemaConfig
+        .newBuilder(
+            isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
+            queryContext)
+        .setIgnoreAuthErrors(isImpersonationEnabled)
+        .build();
+
+    return queryContext.getRootSchema(schemaConfig);
   }
 
   /**
@@ -327,6 +341,20 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return executionControls;
   }
 
+  public String getQueryUserName() {
+    return fragment.getCredentials().getUserName();
+  }
+
+  public boolean isImpersonationEnabled() {
+    // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is
+    // no config
+    if (getConfig() == null) {
+      return false;
+    }
+
+    return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+  }
+
   @Override
   public void close() {
     waitForSendComplete();

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index cd5c054..cc02658 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -39,8 +40,10 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.QueryOptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
 import org.apache.drill.exec.store.PartitionExplorerImpl;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.ImpersonationUtil;
 
 // TODO except for a couple of tests, this is only created by Foreman
 // TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
@@ -49,6 +52,9 @@ import org.apache.drill.exec.testing.ExecutionControls;
 public class QueryContext implements AutoCloseable, UdfUtilities {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
 
+  private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
+  private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
+
   private final DrillbitContext drillbitContext;
   private final UserSession session;
   private final OptionManager queryOptions;
@@ -59,8 +65,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
   private final BufferAllocator allocator;
   private final BufferManager bufferManager;
   private final QueryDateTimeInfo queryDateTimeInfo;
-  private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
-  private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
+  private final ViewExpansionContext viewExpansionContext;
 
   /*
    * Flag to indicate if close has been called, after calling close the first
@@ -89,6 +94,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     }
     // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
     bufferManager = new BufferManager(this.allocator, null);
+    viewExpansionContext = new ViewExpansionContext(this);
   }
 
   public PlannerSettings getPlannerSettings() {
@@ -103,6 +109,13 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     return allocator;
   }
 
+  /**
+   * Return reference to default schema instance in a schema tree. Each {@link net.hydromatic.optiq.SchemaPlus}
+   * instance can refer to its parent and its children. From the returned reference to default schema instance,
+   * clients can traverse the entire schema tree and know the default schema where to look up the tables first.
+   *
+   * @return Reference to default schema instance in a schema tree.
+   */
   public SchemaPlus getNewDefaultSchema() {
     final SchemaPlus rootSchema = getRootSchema();
     final SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
@@ -113,18 +126,52 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     return defaultSchema;
   }
 
+  /**
+   * Get root schema with schema owner as the user who issued the query that is managed by this QueryContext.
+   * @return Root of the schema tree.
+   */
   public SchemaPlus getRootSchema() {
+    return getRootSchema(getQueryUserName());
+  }
+
+  /**
+   * Return root schema with schema owner as the given user.
+   *
+   * @param userName User who owns the schema tree.
+   * @return Root of the schema tree.
+   */
+  public SchemaPlus getRootSchema(String userName) {
+    final String schemaUser = isImpersonationEnabled() ? userName : ImpersonationUtil.getProcessUserName();
+    final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, this).build();
+    return getRootSchema(schemaConfig);
+  }
+
+  /**
+   *  Create and return a SchemaTree with given <i>schemaConfig</i>.
+   * @param schemaConfig
+   * @return
+   */
+  public SchemaPlus getRootSchema(SchemaConfig schemaConfig) {
     try {
       final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
-      drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
+      drillbitContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
       return rootSchema;
     } catch(IOException e) {
+      // We can't proceed further without a schema, throw a runtime exception.
       final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
       logger.error(errMsg, e);
       throw new DrillRuntimeException(errMsg, e);
     }
   }
 
+  /**
+   * Get the user name of the user who issued the query that is managed by this QueryContext.
+   * @return
+   */
+  public String getQueryUserName() {
+    return session.getCredentials().getUserName();
+  }
+
   public OptionManager getOptions() {
     return queryOptions;
   }
@@ -153,6 +200,14 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     return drillbitContext.getFunctionImplementationRegistry();
   }
 
+  public ViewExpansionContext getViewExpansionContext() {
+    return viewExpansionContext;
+  }
+
+  public boolean isImpersonationEnabled() {
+     return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+  }
+
   public DrillOperatorTable getDrillOperatorTable() {
     return table;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
new file mode 100644
index 0000000..9d04ab9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ViewExpansionContext.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import com.google.common.base.Preconditions;
+import net.hydromatic.optiq.SchemaPlus;
+import org.apache.drill.common.exceptions.UserException;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.relopt.RelOptTable.ToRelContext;
+
+import static org.apache.drill.exec.ExecConstants.IMPERSONATION_MAX_CHAINED_USER_HOPS;
+
+/**
+ * Contains context information about view expansion(s) in a query. Part of {@link org.apache.drill.exec.ops
+ * .QueryContext}. Before expanding a view into its definition, as part of the
+ * {@link org.apache.drill.exec.planner.logical.DrillViewTable#toRel(ToRelContext, RelOptTable)}, first a
+ * {@link ViewExpansionToken} is requested from ViewExpansionContext through {@link #reserveViewExpansionToken(String)}.
+ * Once view expansion is complete, a token is released through {@link ViewExpansionToken#release()}. A view definition
+ * itself may contain zero or more views for expanding those nested views also a token is obtained.
+ *
+ * Ex:
+ *   Following are the available view tables: { "view_1", "view_2", "view_3", "view_4" }. Corresponding owners are
+ *   {"view1Owner", "view2Owner", "view3Owner", "view4Owner"}.
+ *   Definition of "view4" : "SELECT field4 FROM view3"
+ *   Definition of "view3" : "SELECT field4, field3 FROM view2"
+ *   Definition of "view2" : "SELECT field4, field3, field2 FROM view1"
+ *   Definition of "view1" : "SELECT field4, field3, field2, field1 FROM someTable"
+ *
+ *   Query is: "SELECT * FROM view4".
+ *   Steps:
+ *     1. "view4" comes for expanding it into its definition
+ *     2. A token "view4Token" is requested through {@link #reserveViewExpansionToken(String view4Owner)}
+ *     3. "view4" is called for expansion. As part of it
+ *       3.1 "view3" comes for expansion
+ *       3.2 A token "view3Token" is requested through {@link #reserveViewExpansionToken(String view3Owner)}
+ *       3.3 "view3" is called for expansion. As part of it
+ *           3.3.1 "view2" comes for expansion
+ *           3.3.2 A token "view2Token" is requested through {@link #reserveViewExpansionToken(String view2Owner)}
+ *           3.3.3 "view2" is called for expansion. As part of it
+ *                 3.3.3.1 "view1" comes for expansion
+ *                 3.3.3.2 A token "view1Token" is requested through {@link #reserveViewExpansionToken(String view1Owner)}
+ *                 3.3.3.3 "view1" is called for expansion
+ *                 3.3.3.4 "view1" expansion is complete
+ *                 3.3.3.5 Token "view1Token" is released
+ *           3.3.4 "view2" expansion is complete
+ *           3.3.5 Token "view2Token" is released
+ *       3.4 "view3" expansion is complete
+ *       3.5 Token "view3Token" is released
+ *    4. "view4" expansion is complete
+ *    5. Token "view4Token" is released.
+ *
+ */
+public class ViewExpansionContext {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ViewExpansionContext.class);
+
+  private final QueryContext queryContext;
+  private final int maxChainedUserHops;
+  private final String queryUser;
+  private final ObjectIntOpenHashMap<String> userTokens = new ObjectIntOpenHashMap<>();
+
+  public ViewExpansionContext(QueryContext queryContext) {
+    this.queryContext = queryContext;
+    this.maxChainedUserHops =
+        queryContext.getConfig().getInt(IMPERSONATION_MAX_CHAINED_USER_HOPS);
+    this.queryUser = queryContext.getQueryUserName();
+  }
+
+  public boolean isImpersonationEnabled() {
+    return queryContext.isImpersonationEnabled();
+  }
+
+  /**
+   * Reserve a token for expansion of view owned by given user name. If it can't issue any more tokens,
+   * throws {@link UserException}.
+   *
+   * @param viewOwner Name of the user who owns the view.
+   * @return An instance of {@link org.apache.drill.exec.ops.ViewExpansionContext.ViewExpansionToken} which must be
+   *         released when done using the token.
+   */
+  public ViewExpansionToken reserveViewExpansionToken(String viewOwner) {
+    int totalTokens = 1;
+    if (!viewOwner.equals(queryUser)) {
+      // We want to track the tokens only if the "viewOwner" is not same as the "queryUser".
+      if (userTokens.containsKey(viewOwner)) {
+        // If the user already exists, we don't need to validate the limit on maximum user hops in chained impersonation
+        // as the limit is for number of unique users.
+        totalTokens += userTokens.get(viewOwner);
+      } else {
+        // Make sure we are not exceeding the limit of maximum number impersonation user hops in chained impersonation.
+        if (userTokens.size() == maxChainedUserHops) {
+          final String errMsg =
+              String.format("Cannot issue token for view expansion as issuing the token exceeds the " +
+                  "maximum allowed number of user hops (%d) in chained impersonation.", maxChainedUserHops);
+          logger.error(errMsg);
+          throw UserException.permissionError().message(errMsg).build();
+        }
+      }
+
+      userTokens.put(viewOwner, totalTokens);
+
+      logger.debug("Issued view expansion token for user '{}'", viewOwner);
+    }
+
+    return new ViewExpansionToken(viewOwner);
+  }
+
+  private void releaseViewExpansionToken(ViewExpansionToken token) {
+    final String viewOwner = token.viewOwner;
+
+    if (viewOwner.equals(queryUser)) {
+      // If the token owner and queryUser are same, no need to track the token release.
+      return;
+    }
+
+    Preconditions.checkState(userTokens.containsKey(token.viewOwner),
+        "Given user doesn't exist in User Token store. Make sure token for this user is obtained first.");
+
+    final int userTokenCount = userTokens.get(viewOwner);
+    if (userTokenCount == 1) {
+      // Remove the user from collection, when there are no more tokens issued to the user.
+      userTokens.remove(viewOwner);
+    } else {
+      userTokens.put(viewOwner, userTokenCount - 1);
+    }
+    logger.debug("Released view expansion token issued for user '{}'", viewOwner);
+  }
+
+  /**
+   * Represents token issued to a view owner for expanding the view.
+   */
+  public class ViewExpansionToken {
+    private final String viewOwner;
+
+    private boolean released;
+
+    ViewExpansionToken(String viewOwner) {
+      this.viewOwner = viewOwner;
+    }
+
+    /**
+     * Get schema tree for view owner who owns this token.
+     * @return Root of schema tree.
+     */
+    public SchemaPlus getSchemaTree() {
+      Preconditions.checkState(!released, "Trying to use released token.");
+      return queryContext.getRootSchema(viewOwner);
+    }
+
+    /**
+     * Release the token. Once released all method calls (except release) cause {@link java.lang.IllegalStateException}.
+     */
+    public void release() {
+      if (!released) {
+        released = true;
+        releaseViewExpansionToken(this);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index b1a71a5..9e60f21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -53,6 +53,7 @@ import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.eigenbase.rel.RelFieldCollation.Direction;
@@ -69,9 +70,11 @@ public class BasicOptimizer extends Optimizer {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
 
   private final QueryContext queryContext;
+  private final UserClientConnection userSession;
 
-  public BasicOptimizer(final QueryContext queryContext) {
+  public BasicOptimizer(final QueryContext queryContext, final UserClientConnection userSession) {
     this.queryContext = queryContext;
+    this.userSession = userSession;
   }
 
   @Override
@@ -208,7 +211,8 @@ public class BasicOptimizer extends Optimizer {
       }
       try {
         final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
-        return storagePlugin.getPhysicalScan(scan.getSelection());
+        final String user = userSession.getSession().getCredentials().getUserName();
+        return storagePlugin.getPhysicalScan(user, scan.getSelection());
       } catch (IOException | ExecutionSetupException e) {
         throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index defb4e4..c7b0e7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -25,11 +25,26 @@ import com.google.common.base.Preconditions;
 public abstract class AbstractBase implements PhysicalOperator{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
 
+  private final String userName;
+
   protected long initialAllocation = 1000000L;
   protected long maxAllocation = 10000000000L;
   private int id;
   private double cost;
 
+  public AbstractBase() {
+    userName = null;
+  }
+
+  public AbstractBase(String userName) {
+    this.userName = userName;
+  }
+
+  public AbstractBase(AbstractBase that) {
+    Preconditions.checkNotNull(that, "Unable to clone: source is null.");
+    this.userName = that.userName;
+  }
+
   @Override
   public void accept(GraphVisitor<PhysicalOperator> visitor) {
     visitor.enter(this);
@@ -48,6 +63,7 @@ public abstract class AbstractBase implements PhysicalOperator{
     return true;
   }
 
+  @Override
   public final void setOperatorId(int id) {
     this.id = id;
   }
@@ -80,4 +96,8 @@ public abstract class AbstractBase implements PhysicalOperator{
     return maxAllocation;
   }
 
+  @Override
+  public String getUserName() {
+    return userName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
index ee809fc..606aa4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
@@ -24,6 +24,13 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class);
 
+  public AbstractFileGroupScan(String userName) {
+    super(userName);
+  }
+
+  public AbstractFileGroupScan(AbstractFileGroupScan that) {
+    super(that);
+  }
 
   @Override
   public void modifyFileSelection(FileSelection selection) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 8fe21e6..242bd5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -30,6 +30,14 @@ import org.apache.drill.exec.physical.EndpointAffinity;
 public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
 
+  public AbstractGroupScan(String userName) {
+    super(userName);
+  }
+
+  public AbstractGroupScan(AbstractGroupScan that) {
+    super(that);
+  }
+
   @Override
   public Iterator<PhysicalOperator> iterator() {
     return Iterators.emptyIterator();

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
index a36a46e..5ec5698 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
@@ -29,6 +29,10 @@ import com.google.common.collect.Iterators;
 public abstract class AbstractSubScan extends AbstractBase implements SubScan{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class);
 
+  public AbstractSubScan(String userName) {
+    super(userName);
+  }
+
   @Override
   public boolean isExecutable() {
     return true;

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index a5518ca..b1954ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -95,6 +95,14 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
   @JsonProperty("cost")
   public double getCost();
 
+  /**
+   * Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this
+   * PhysicalOperator. Default value is "null" in which case we impersonate as user who launched the query.
+   * @return
+   */
+  @JsonProperty("userName")
+  public String getUserName();
+
   @JsonIgnore
   public int getOperatorType();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index e25f1c0..912dfd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -17,12 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
@@ -32,65 +33,112 @@ import org.apache.drill.exec.util.AssertionUtil;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
- * Implementation of the physical operator visitor
+ * Create RecordBatch tree (PhysicalOperator implementations) for a given PhysicalOperator tree.
  */
-public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException> {
+public class ImplCreator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
 
-  private RootExec root = null;
+  private static final ImplCreator INSTANCE = new ImplCreator();
 
   private ImplCreator() {}
 
-  private RootExec getRoot() {
-    return root;
+  /**
+   * Create and return fragment RootExec for given FragmentRoot. RootExec has one or more RecordBatches as children
+   * (which may contain child RecordBatches and so on).
+   * @param context FragmentContext.
+   * @param root FragmentRoot.
+   * @return RootExec of fragment.
+   * @throws ExecutionSetupException
+   */
+  public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
+    Preconditions.checkNotNull(root);
+    Preconditions.checkNotNull(context);
+
+    if (AssertionUtil.isAssertionsEnabled()) {
+      root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
+    }
+
+    Stopwatch watch = new Stopwatch();
+    watch.start();
+    final RootExec rootExec = INSTANCE.getRootExec(root, context);
+    logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS));
+    if (rootExec == null) {
+      throw new ExecutionSetupException(
+          "The provided fragment did not have a root node that correctly created a RootExec value.");
+    }
+
+    return rootExec;
   }
 
-  @Override
-  @SuppressWarnings("unchecked")
-  public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
-    Preconditions.checkNotNull(op);
-    Preconditions.checkNotNull(context);
+  /** Create RootExec and its children (RecordBatches) for given FragmentRoot */
+  private RootExec getRootExec(final FragmentRoot root, final FragmentContext context) throws ExecutionSetupException {
+    final List<RecordBatch> childRecordBatches = getChildren(root, context);
 
-    Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(op.getClass());
-    if (opCreator != null) {
-      if (op instanceof FragmentRoot ) {
-        root = ((RootCreator<PhysicalOperator>)opCreator).getRoot(context, op, getChildren(op, context));
-        return null;
-      } else {
-        return ((BatchCreator<PhysicalOperator>)opCreator).getBatch(context, op, getChildren(op, context));
+    if (context.isImpersonationEnabled()) {
+      final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(root.getUserName(), context.getQueryUserName());
+      try {
+        return proxyUgi.doAs(new PrivilegedExceptionAction<RootExec>() {
+          public RootExec run() throws Exception {
+            return ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches);
+          }
+        });
+      } catch (InterruptedException | IOException e) {
+        final String errMsg = String.format("Failed to create RootExec for operator with id '%d'", root.getOperatorId());
+        logger.error(errMsg, e);
+        throw new ExecutionSetupException(errMsg, e);
       }
     } else {
-      throw new UnsupportedOperationException(String.format(
-          "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator type %s.",
-          this.getClass().getCanonicalName(), op.getClass().getCanonicalName()));
+      return ((RootCreator<PhysicalOperator>) getOpCreator(root, context)).getRoot(context, root, childRecordBatches);
     }
   }
 
-  private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
-    List<RecordBatch> children = Lists.newArrayList();
-    for (PhysicalOperator child : op) {
-      children.add(child.accept(this, context));
+  /** Create a RecordBatch and its children for given PhysicalOperator */
+  private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+    Preconditions.checkNotNull(op);
+
+    final List<RecordBatch> childRecordBatches = getChildren(op, context);
+
+    if (context.isImpersonationEnabled()) {
+      final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(op.getUserName(), context.getQueryUserName());
+      try {
+        return proxyUgi.doAs(new PrivilegedExceptionAction<RecordBatch>() {
+          public RecordBatch run() throws Exception {
+            return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
+          }
+        });
+      } catch (InterruptedException | IOException e) {
+        final String errMsg = String.format("Failed to create RecordBatch for operator with id '%d'", op.getOperatorId());
+        logger.error(errMsg, e);
+        throw new ExecutionSetupException(errMsg, e);
+      }
+    } else {
+      return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
     }
-    return children;
   }
 
-  public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
-    ImplCreator i = new ImplCreator();
-    if (AssertionUtil.isAssertionsEnabled()) {
-      root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
+  /** Helper method to get OperatorCreator (RootCreator or BatchCreator) for given PhysicalOperator (root or non-root) */
+  private Object getOpCreator(PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+    final Class opClass = op.getClass();
+    Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(opClass);
+    if (opCreator == null) {
+      throw new UnsupportedOperationException(
+          String.format("BatchCreator for PhysicalOperator type '%s' not found.", opClass.getCanonicalName()));
     }
 
-    Stopwatch watch = new Stopwatch();
-    watch.start();
-    root.accept(i, context);
-    logger.debug("Took {} ms to accept", watch.elapsed(TimeUnit.MILLISECONDS));
-    if (i.root == null) {
-      throw new ExecutionSetupException(
-          "The provided fragment did not have a root node that correctly created a RootExec value.");
-    }
-    return i.getRoot();
+    return opCreator;
   }
 
-}
+  /** Helper method to traverse the children of given PhysicalOperator and create RecordBatches for children recursively */
+  private List<RecordBatch> getChildren(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+    List<RecordBatch> children = Lists.newArrayList();
+    for (PhysicalOperator child : op) {
+      children.add(getRecordBatch(child, context));
+    }
+
+    return children;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index c8f872e..5451ca0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -28,29 +28,48 @@ import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptTable;
 
 public abstract class DrillTable implements Table {
 
   private final String storageEngineName;
-  public final StoragePluginConfig storageEngineConfig;
-  private Object selection;
-  private StoragePlugin plugin;
+  private final StoragePluginConfig storageEngineConfig;
+  private final Object selection;
+  private final StoragePlugin plugin;
+  private final String userName;
+
   private GroupScan scan;
 
-  /** Creates a DrillTable. */
-  public DrillTable(String storageEngineName, StoragePlugin plugin, Object selection) {
+  /**
+   * Creates a DrillTable instance.
+   * @param storageEngineName StorageEngine name.
+   * @param plugin Reference to StoragePlugin.
+   * @param userName Whom to impersonate while reading the contents of the table.
+   * @param selection Table contents (type and contents depend on type of StoragePlugin).
+   */
+  public DrillTable(String storageEngineName, StoragePlugin plugin, String userName, Object selection) {
     this.selection = selection;
     this.plugin = plugin;
 
     this.storageEngineConfig = plugin.getConfig();
     this.storageEngineName = storageEngineName;
+    this.userName = userName;
+  }
+
+  /**
+   * TODO: Same purpose as other constructor except the impersonation user is the user who is running the Drillbit
+   * process. Once we add impersonation to non-FileSystem storage plugins such as Hive, HBase etc,
+   * we can remove this constructor.
+   */
+  public DrillTable(String storageEngineName, StoragePlugin plugin, Object selection) {
+    this(storageEngineName, plugin, ImpersonationUtil.getProcessUserName(), selection);
   }
 
   public GroupScan getGroupScan() throws IOException{
     if (scan == null) {
-      this.scan = plugin.getPhysicalScan(new JSONOptions(selection));
+      this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection));
     }
     return scan;
   }
@@ -94,6 +113,7 @@ public abstract class DrillTable implements Table {
     result = prime * result + ((selection == null) ? 0 : selection.hashCode());
     result = prime * result + ((storageEngineConfig == null) ? 0 : storageEngineConfig.hashCode());
     result = prime * result + ((storageEngineName == null) ? 0 : storageEngineName.hashCode());
+    result = prime * result + ((userName == null) ? 0 : userName.hashCode());
     return result;
   }
 
@@ -130,6 +150,13 @@ public abstract class DrillTable implements Table {
     } else if (!storageEngineName.equals(other.storageEngineName)) {
       return false;
     }
+    if (userName == null) {
+      if (other.userName != null) {
+        return false;
+      }
+    } else if (!userName.equals(other.userName)) {
+      return false;
+    }
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
index 68e666a..9c5a94f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillViewTable.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.planner.logical;
 
-import java.util.List;
-
 import net.hydromatic.optiq.Schema.TableType;
 import net.hydromatic.optiq.Statistic;
 import net.hydromatic.optiq.Statistics;
 import net.hydromatic.optiq.TranslatableTable;
 
 import org.apache.drill.exec.dotdrill.View;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.ops.ViewExpansionContext.ViewExpansionToken;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptTable;
 import org.eigenbase.relopt.RelOptTable.ToRelContext;
@@ -36,9 +36,13 @@ public class DrillViewTable implements TranslatableTable, DrillViewInfoProvider
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillViewTable.class);
 
   private final View view;
+  private final String viewOwner;
+  private final ViewExpansionContext viewExpansionContext;
 
-  public DrillViewTable(List<String> path, View view){
+  public DrillViewTable(View view, String viewOwner, ViewExpansionContext viewExpansionContext){
     this.view = view;
+    this.viewOwner = viewOwner;
+    this.viewExpansionContext = viewExpansionContext;
   }
 
   @Override
@@ -53,15 +57,28 @@ public class DrillViewTable implements TranslatableTable, DrillViewInfoProvider
 
   @Override
   public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {
-    RelDataType rowType = relOptTable.getRowType();
-    RelNode rel = context.expandView(rowType, view.getSql(), view.getWorkspaceSchemaPath());
+    ViewExpansionToken token = null;
+    try {
+      RelDataType rowType = relOptTable.getRowType();
+      RelNode rel;
+
+      if (viewExpansionContext.isImpersonationEnabled()) {
+        token = viewExpansionContext.reserveViewExpansionToken(viewOwner);
+        rel = context.expandView(rowType, view.getSql(), token.getSchemaTree(), view.getWorkspaceSchemaPath());
+      } else {
+        rel = context.expandView(rowType, view.getSql(), view.getWorkspaceSchemaPath());
+      }
+
+      // If the View's field list is not "*", create a cast.
+      if (!view.isDynamic() && !view.hasStar()) {
+        rel = RelOptUtil.createCastRel(rel, rowType, true);
+      }
 
-    if (view.isDynamic() || view.hasStar()){
-      // if View's field has "*", return rel directly.
       return rel;
-    }else{
-      // if the View's field list is not "*", try to create a cast.
-      return RelOptUtil.createCastRel(rel, rowType, true);
+    } finally {
+      if (token != null) {
+        token.release();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
index 843db58..24917f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DynamicDrillTable.java
@@ -28,6 +28,15 @@ public class DynamicDrillTable extends DrillTable{
 
   private RelDataTypeHolder holder = new RelDataTypeHolder();
 
+  public DynamicDrillTable(StoragePlugin plugin, String storageEngineName, String userName, Object selection) {
+    super(storageEngineName, plugin, userName, selection);
+  }
+
+  /**
+   * TODO: Same purpose as other constructor except the impersonation user is the user who is running the Drillbit
+   * process. Once we add impersonation to non-FileSystem storage plugins such as Hive, HBase etc,
+   * we can remove this constructor.
+   */
   public DynamicDrillTable(StoragePlugin plugin, String storageEngineName, Object selection) {
     super(storageEngineName, plugin, selection);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 097b7bb..d56f1db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.foreman.ForemanException;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.hadoop.security.AccessControlException;
 import org.eigenbase.rel.RelCollationTraitDef;
 import org.eigenbase.rel.rules.ReduceExpressionsRule;
 import org.eigenbase.rel.rules.WindowedAggSplitterRule;
@@ -159,6 +160,8 @@ public class DrillSqlWorker {
     } catch(ValidationException e) {
       String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
       throw UserException.parseError(e).message(errorMessage).build();
+    } catch (AccessControlException e) {
+      throw UserException.permissionError(e).build();
     } catch (IOException | RelConversionException e) {
       throw new QueryInputException("Failure handling SQL.", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index a17a604..3e990c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -58,8 +58,8 @@ public class CreateTableHandler extends DefaultSqlHandler {
       AbstractSchema drillSchema = getDrillSchema(schema);
 
       if (!drillSchema.isMutable()) {
-        return DirectPlan.createDirectPlan(context, false, String.format("Current schema '%s' is not a mutable schema. " +
-            "Can't create tables in this schema.", drillSchema.getFullSchemaName()));
+        return DirectPlan.createDirectPlan(context, false, String.format("Unable to create table. " +
+            "Schema [%s] is immutable. ", drillSchema.getFullSchemaName()));
       }
 
       final String newTblName = sqlCreateTable.getName();

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
index a486369..5f9061a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.torel;
 import java.util.List;
 import java.util.Map;
 
+import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.prepare.Prepare;
 
 import org.apache.drill.common.expression.LogicalExpression;
@@ -114,6 +115,11 @@ public class ConversionContext implements ToRelContext {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public RelNode expandView(RelDataType rowType, String queryString, SchemaPlus rootSchema, List<String> schemaPath) {
+    throw new UnsupportedOperationException();
+  }
+
   private static class ConverterVisitor extends AbstractLogicalVisitor<RelNode, ConversionContext, InvalidRelException>{
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a745479..3dc7c14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -96,6 +96,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.HASH_JOIN_TABLE_FACTOR,
       ExecConstants.HASH_AGG_TABLE_FACTOR,
       ExecConstants.AVERAGE_FIELD_WIDTH,
+      ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR,
       QueryClassLoader.JAVA_COMPILER_VALIDATOR,
       QueryClassLoader.JAVA_COMPILER_JANINO_MAXSIZE,
       QueryClassLoader.JAVA_COMPILER_DEBUG,


Mime
View raw message