drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [1/4] drill git commit: DRILL-2514: Add support for impersonation in FileSystem storage plugin.
Date Tue, 21 Apr 2015 22:21:36 GMT
Repository: drill
Updated Branches:
  refs/heads/master fbb405bdb -> 2a484251b


http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index b032fce..58c8622 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -49,12 +49,12 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
-    return getPhysicalScan(selection, AbstractGroupScan.ALL_COLUMNS);
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS);
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
     throw new UnsupportedOperationException();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
new file mode 100644
index 0000000..0297945
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaConfig.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.ViewExpansionContext;
+import org.apache.drill.exec.server.options.OptionValue;
+
+/**
+ * Contains information needed by {@link org.apache.drill.exec.store.AbstractSchema} implementations.
+ */
+public class SchemaConfig {
+  private final String userName;
+  private final QueryContext queryContext;
+  private final boolean ignoreAuthErrors;
+
+  private SchemaConfig(final String userName, final QueryContext queryContext, final boolean ignoreAuthErrors) {
+    this.userName = userName;
+    this.queryContext = queryContext;
+    this.ignoreAuthErrors = ignoreAuthErrors;
+  }
+
+  public static Builder newBuilder(final String userName, final QueryContext queryContext) {
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(userName), "A valid userName is expected");
+    Preconditions.checkNotNull(queryContext, "Non-null QueryContext is expected");
+    return new Builder(userName, queryContext);
+  }
+
+  public static class Builder {
+    final String userName;
+    final QueryContext queryContext;
+    boolean ignoreAuthErrors;
+
+    private Builder(final String userName, final QueryContext queryContext) {
+      this.userName = userName;
+      this.queryContext = queryContext;
+    }
+
+    public Builder setIgnoreAuthErrors(boolean ignoreAuthErrors) {
+      this.ignoreAuthErrors = ignoreAuthErrors;
+      return this;
+    }
+
+    public SchemaConfig build() {
+      return new SchemaConfig(userName, queryContext, ignoreAuthErrors);
+    }
+  }
+
+  public QueryContext getQueryContext() {
+    return queryContext;
+  }
+
+  /**
+   * @return User whom to impersonate as while {@link net.hydromatic.optiq.SchemaPlus} instances
+   * interact with the underlying storage.
+   */
+  public String getUserName() {
+    return userName;
+  }
+
+  /**
+   * @return Should ignore if authorization errors are reported while {@link net.hydromatic.optiq.SchemaPlus}
+   * instances interact with the underlying storage.
+   */
+  public boolean getIgnoreAuthErrors() {
+    return ignoreAuthErrors;
+  }
+
+  public OptionValue getOption(String optionKey) {
+    return queryContext.getOptions().getOption(optionKey);
+  }
+
+  public ViewExpansionContext getViewExpansionContext() {
+    return queryContext.getViewExpansionContext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
index 14d2fab..e2dc613 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -19,12 +19,22 @@ package org.apache.drill.exec.store;
 
 import net.hydromatic.optiq.SchemaPlus;
 
-import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.ops.QueryContext;
 
 import java.io.IOException;
 
+/**
+ * StoragePlugins implements this interface to register the schemas they provide.
+ */
 public interface SchemaFactory {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
 
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException;
+  /**
+   * Register the schemas provided by this SchemaFactory implementation under the given parent schema.
+   *
+   * @param schemaConfig Configuration for schema objects.
+   * @param parent Reference to parent schema.
+   * @throws IOException
+   */
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index ef5978c..b60c16f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -36,26 +36,24 @@ public interface StoragePlugin extends SchemaFactory {
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
    *
-   * @param selection
-   *          The configured storage engine specific selection.
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
    * @return
    * @throws IOException
    */
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection)
-      throws IOException;
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException;
 
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
    *
-   * @param selection
-   *          The configured storage engine specific selection.
-   * @param columns
-   *          (optional) The list of column names to scan from the data source.
+   * @param userName User whom to impersonate when when reading the contents as part of Scan.
+   * @param selection The configured storage engine specific selection.
+   * @param columns (optional) The list of column names to scan from the data source.
    * @return
    * @throws IOException
    */
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection,
-      List<SchemaPath> columns) throws IOException;
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException;
 
   public StoragePluginConfig getConfig();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index cb9ee0f..bda4cc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -42,6 +42,8 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.util.PathScanner;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.ViewExpansionContext;
 import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -301,7 +303,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
   public class DrillSchemaFactory implements SchemaFactory {
 
     @Override
-    public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+    public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
       Stopwatch watch = new Stopwatch();
       watch.start();
 
@@ -325,7 +327,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
 
         // finally register schemas with the refreshed plugins
         for (StoragePlugin plugin : plugins.values()) {
-          plugin.registerSchemas(session, parent);
+          plugin.registerSchemas(schemaConfig, parent);
         }
       } catch (ExecutionSetupException e) {
         throw new DrillRuntimeException("Failure while updating storage plugins", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 775b402..93fb0a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -29,12 +29,13 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.ClassPathFileSystem;
 import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -110,7 +111,8 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException {
     FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class);
     FormatPlugin plugin;
     if (formatSelection.getFormat() instanceof NamedFormatPluginConfig) {
@@ -121,12 +123,12 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
     if (plugin == null) {
       throw new IOException(String.format("Failure getting requested format plugin named '%s'.  It was not one of the format plugins registered.", formatSelection.getFormat()));
     }
-    return plugin.getGroupScan(formatSelection.getSelection(), columns);
+    return plugin.getGroupScan(userName, formatSelection.getSelection(), columns);
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(session, parent);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
   }
 
   public FormatPlugin getFormatPlugin(String name) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index e11712e..30d8d25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -27,10 +27,11 @@ import net.hydromatic.optiq.Function;
 import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.Table;
 
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
 
@@ -58,8 +59,8 @@ public class FileSystemSchemaFactory implements SchemaFactory{
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
-    FileSystemSchema schema = new FileSystemSchema(schemaName, session);
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    FileSystemSchema schema = new FileSystemSchema(schemaName, schemaConfig);
     SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
     schema.setPlus(plusOfThis);
   }
@@ -69,10 +70,10 @@ public class FileSystemSchemaFactory implements SchemaFactory{
     private final WorkspaceSchema defaultSchema;
     private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
 
-    public FileSystemSchema(String name, UserSession session) throws IOException {
+    public FileSystemSchema(String name, SchemaConfig schemaConfig) throws IOException {
       super(ImmutableList.<String>of(), name);
       for(WorkspaceSchemaFactory f :  factories){
-        WorkspaceSchema s = f.createSchema(getSchemaPath(), session);
+        WorkspaceSchema s = f.createSchema(getSchemaPath(), schemaConfig);
         schemaMap.put(s.getName(), s);
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 955dfeb..5668c54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -44,11 +44,9 @@ public interface FormatPlugin {
 
   public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException;
 
-  public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException;
-
   public Set<StoragePluginOptimizerRule> getOptimizerRules();
 
-  public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException;
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException;
 
   public FormatPluginConfig getConfig();
   public StoragePluginConfig getStorageConfig();

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index a536350..7cd50b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -30,6 +31,7 @@ import net.hydromatic.optiq.Table;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.dotdrill.DotDrillFile;
 import org.apache.drill.exec.dotdrill.DotDrillType;
@@ -41,9 +43,10 @@ import org.apache.drill.exec.planner.logical.DrillViewTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
 import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -52,6 +55,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
 
 public class WorkspaceSchemaFactory {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
@@ -106,25 +111,27 @@ public class WorkspaceSchemaFactory {
     return DotDrillType.VIEW.getPath(config.getLocation(), name);
   }
 
-  public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) throws IOException {
-    return new WorkspaceSchema(parentSchemaPath, schemaName, session);
+  public WorkspaceSchema createSchema(List<String> parentSchemaPath, SchemaConfig schemaConfig) throws  IOException {
+    return new WorkspaceSchema(parentSchemaPath, schemaName, schemaConfig);
   }
 
   public class WorkspaceSchema extends AbstractSchema implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
     private final ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(this);
-    private final UserSession session;
+    private final SchemaConfig schemaConfig;
     private final DrillFileSystem fs;
 
-    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, UserSession session) throws IOException {
+    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig) throws IOException {
       super(parentSchemaPath, wsName);
-      this.session = session;
-      this.fs = new DrillFileSystem(fsConf);
+      this.schemaConfig = schemaConfig;
+      this.fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), fsConf);
     }
 
     public boolean createView(View view) throws Exception {
       Path viewPath = getViewPath(view.getName());
       boolean replaced = fs.exists(viewPath);
-      try (OutputStream stream = fs.create(viewPath)) {
+      final FsPermission viewPerms =
+          new FsPermission(schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
+      try (OutputStream stream = DrillFileSystem.create(fs, viewPath, viewPerms)) {
         mapper.writeValue(stream, view);
       }
       return replaced;
@@ -145,11 +152,6 @@ public class WorkspaceSchemaFactory {
       return new SubDirectoryList(fileStatuses);
     }
 
-    public boolean viewExists(String viewName) throws Exception {
-      Path viewPath = getViewPath(viewName);
-      return fs.exists(viewPath);
-    }
-
     public void dropView(String viewName) throws IOException {
       fs.delete(getViewPath(viewName), false);
     }
@@ -165,6 +167,14 @@ public class WorkspaceSchemaFactory {
         }
       } catch (UnsupportedOperationException e) {
         logger.debug("The filesystem for this workspace does not support this operation.", e);
+      } catch (AccessControlException e) {
+        if (!schemaConfig.getIgnoreAuthErrors()) {
+          logger.debug(e.getMessage());
+          throw UserException
+              .permissionError(e)
+              .message("Not authorized to list view tables in schema [%s]", getFullSchemaName())
+              .build();
+        }
       } catch (Exception e) {
         logger.warn("Failure while trying to list .view.drill files in workspace [{}]", getFullSchemaName(), e);
       }
@@ -177,7 +187,7 @@ public class WorkspaceSchemaFactory {
       return Sets.union(tables.keySet(), getViews());
     }
 
-    private View getView(DotDrillFile f) throws Exception{
+    private View getView(DotDrillFile f) throws IOException{
       assert f.getType() == DotDrillType.VIEW;
       return f.getView(drillConfig);
     }
@@ -190,19 +200,42 @@ public class WorkspaceSchemaFactory {
       }
 
       // then look for files that start with this name and end in .drill.
-      List<DotDrillFile> files;
+      List<DotDrillFile> files = Collections.EMPTY_LIST;
       try {
-        files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), name, DotDrillType.VIEW);
+        try {
+          files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), name, DotDrillType.VIEW);
+        } catch(AccessControlException e) {
+          if (!schemaConfig.getIgnoreAuthErrors()) {
+            logger.debug(e.getMessage());
+            throw UserException
+                .permissionError(e)
+                .message("Not authorized to list or query tables in schema [%s]", getFullSchemaName())
+                .build();
+          }
+        } catch(IOException e) {
+          logger.warn("Failure while trying to list view tables in workspace [{}]", name, getFullSchemaName(), e);
+        }
+
         for(DotDrillFile f : files) {
           switch(f.getType()) {
           case VIEW:
-            return new DrillViewTable(schemaPath, getView(f));
+            try {
+              return new DrillViewTable(getView(f), f.getOwner(), schemaConfig.getViewExpansionContext());
+            } catch (AccessControlException e) {
+              if (!schemaConfig.getIgnoreAuthErrors()) {
+                logger.debug(e.getMessage());
+                throw UserException
+                    .permissionError(e)
+                    .message("Not authorized to read view [%s] in schema [%s]", name, getFullSchemaName())
+                    .build();
+              }
+            } catch (IOException e) {
+              logger.warn("Failure while trying to load {}.view.drill file in workspace [{}]", name, getFullSchemaName(), e);
+            }
           }
         }
       } catch (UnsupportedOperationException e) {
         logger.debug("The filesystem for this workspace does not support this operation.", e);
-      } catch (Exception e) {
-        logger.warn("Failure while trying to load {}.view.drill file in workspace [{}]", name, getFullSchemaName(), e);
       }
 
       return tables.get(name);
@@ -223,7 +256,7 @@ public class WorkspaceSchemaFactory {
 
     @Override
     public CreateTableEntry createNewTable(String tableName) {
-      String storage = session.getOptions().getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
+      String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
       FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
       if (formatPlugin == null) {
         throw new UnsupportedOperationException(
@@ -256,7 +289,7 @@ public class WorkspaceSchemaFactory {
             try {
               Object selection = m.isReadable(fs, fileSelection);
               if (selection != null) {
-                return new DynamicDrillTable(plugin, storageEngineName, selection);
+                return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
               }
             } catch (IOException e) {
               logger.debug("File read failed.", e);
@@ -268,11 +301,19 @@ public class WorkspaceSchemaFactory {
         for (FormatMatcher m : fileMatchers) {
           Object selection = m.isReadable(fs, fileSelection);
           if (selection != null) {
-            return new DynamicDrillTable(plugin, storageEngineName, selection);
+            return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
           }
         }
         return null;
 
+      } catch (AccessControlException e) {
+        if (!schemaConfig.getIgnoreAuthErrors()) {
+          logger.debug(e.getMessage());
+          throw UserException
+              .permissionError(e)
+              .message("Not authorized to read table [%s] in schema [%s]", key, getFullSchemaName())
+              .build();
+        }
       } catch (IOException e) {
         logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 5c7152a..f1271b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
@@ -147,7 +148,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
         newColumns.add(AbstractRecordReader.STAR_COLUMN);
       }
       // Create a new sub scan object with the new set of columns;
-      scan = new EasySubScan(scan.getWorkUnits(), scan.getFormatPlugin(), newColumns, scan.getSelectionRoot());
+      scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), newColumns,
+          scan.getSelectionRoot());
     }
 
     int numParts = 0;
@@ -203,13 +205,9 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   }
 
   @Override
-  public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException {
-    return new EasyGroupScan(selection, this, selection.selectionRoot);
-  }
-
-  @Override
-  public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
-    return new EasyGroupScan(selection, this, columns, selection.selectionRoot);
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+      throws IOException {
+    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 7c70df3..1b333ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -49,6 +49,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.util.ImpersonationUtil;
 
 @JsonTypeName("fs-scan")
 public class EasyGroupScan extends AbstractFileGroupScan{
@@ -66,6 +67,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
 
   @JsonCreator
   public EasyGroupScan(
+      @JsonProperty("userName") String userName,
       @JsonProperty("files") List<String> files, //
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
@@ -73,23 +75,26 @@ public class EasyGroupScan extends AbstractFileGroupScan{
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("selectionRoot") String selectionRoot
       ) throws IOException, ExecutionSetupException {
-        this(new FileSelection(files, true),
+        this(ImpersonationUtil.resolveUserName(userName),
+            new FileSelection(files, true),
             (EasyFormatPlugin<?>)engineRegistry.getFormatPlugin(storageConfig, formatConfig),
             columns,
             selectionRoot);
   }
 
-  public EasyGroupScan(FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
+  public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
       throws IOException {
-    this(selection, formatPlugin, ALL_COLUMNS, selectionRoot);
+    this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot);
   }
 
   public EasyGroupScan(
+      String userName,
       FileSelection selection, //
       EasyFormatPlugin<?> formatPlugin, //
       List<SchemaPath> columns,
       String selectionRoot
       ) throws IOException{
+    super(userName);
     this.selection = Preconditions.checkNotNull(selection);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
@@ -98,7 +103,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
   }
 
   private EasyGroupScan(EasyGroupScan that) {
-    Preconditions.checkNotNull(that, "Unable to clone: source is null.");
+    super(that.getUserName());
     selection = that.selection;
     formatPlugin = that.formatPlugin;
     columns = that.columns;
@@ -110,7 +115,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
   }
 
   private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
-    final DrillFileSystem dfs = new DrillFileSystem(formatPlugin.getFsConf());
+    final DrillFileSystem dfs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
     this.selection = selection;
     BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
     this.chunks = b.generateFileWork(selection.getFileStatusList(dfs), formatPlugin.isBlockSplittable());
@@ -203,7 +208,7 @@ public class EasyGroupScan extends AbstractFileGroupScan{
     Preconditions.checkArgument(!filesForMinor.isEmpty(),
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
-    return new EasySubScan(convert(filesForMinor), formatPlugin, columns, selectionRoot);
+    return new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
   }
 
   private List<FileWorkImpl> convert(List<CompleteFileWork> list) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index e78ba0b..5fd5039 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -47,6 +47,7 @@ public class EasySubScan extends AbstractSubScan{
 
   @JsonCreator
   public EasySubScan(
+      @JsonProperty("userName") String userName,
       @JsonProperty("files") List<FileWorkImpl> files, //
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
@@ -54,7 +55,7 @@ public class EasySubScan extends AbstractSubScan{
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot
       ) throws IOException, ExecutionSetupException {
-
+    super(userName);
     this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(this.formatPlugin);
     this.files = files;
@@ -62,7 +63,9 @@ public class EasySubScan extends AbstractSubScan{
     this.selectionRoot = selectionRoot;
   }
 
-  public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, String selectionRoot){
+  public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns,
+      String selectionRoot){
+    super(userName);
     this.formatPlugin = plugin;
     this.files = files;
     this.columns = columns;

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index 262c6be..e08fe71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -38,7 +38,7 @@ public class DirectGroupScan extends AbstractGroupScan{
   private final RecordReader reader;
 
   public DirectGroupScan(RecordReader reader) {
-    super();
+    super((String)null);
     this.reader = reader;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
index 89694f8..763ecba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
@@ -27,7 +27,7 @@ public class DirectSubScan extends AbstractSubScan{
   private final RecordReader reader;
 
   public DirectSubScan(RecordReader reader) {
-    super();
+    super(null);
     this.reader = reader;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 237589c..722650d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -77,8 +77,9 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
-    return new EasyGroupScan(selection, this, columns, selection.selectionRoot); //TODO : textformat supports project?
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+      throws IOException {
+    return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index 22cc483..bd0d582 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -54,11 +54,13 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
   @JsonCreator
   public InfoSchemaGroupScan(@JsonProperty("table") SelectedTable table,
                              @JsonProperty("filter") InfoSchemaFilter filter) {
+    super((String)null);
     this.table = table;
     this.filter = filter;
   }
 
   private InfoSchemaGroupScan(InfoSchemaGroupScan that) {
+    super(that);
     this.table = that.table;
     this.filter = that.filter;
     this.isFilterPushedDown = that.isFilterPushedDown;

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index a1249e6..4dfde7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -29,7 +29,6 @@ import net.hydromatic.optiq.Table;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -37,6 +36,7 @@ import org.apache.drill.exec.store.AbstractStoragePlugin;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 
 public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements InfoSchemaConstants {
@@ -58,7 +58,8 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
   }
 
   @Override
-  public InfoSchemaGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public InfoSchemaGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException {
     SelectedTable table = selection.getWith(context.getConfig(),  SelectedTable.class);
     return new InfoSchemaGroupScan(table);
   }
@@ -69,7 +70,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     ISchema s = new ISchema(parent, this);
     parent.add(s.getName(), s);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
index b9349b0..7a479d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
@@ -32,6 +32,7 @@ public class InfoSchemaSubScan extends AbstractSubScan{
   @JsonCreator
   public InfoSchemaSubScan(@JsonProperty("table") SelectedTable table,
                            @JsonProperty("filter") InfoSchemaFilter filter) {
+    super(null);
     this.table = table;
     this.filter = filter;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
index 947998d..5b132c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
@@ -116,8 +116,14 @@ public abstract class RecordGenerator implements InfoSchemaConstants {
       // ... do for each of the schema's tables.
       for (String tableName: schema.getTableNames()) {
         Table table = schema.getTable(tableName);
-        // Visit the table, and if requested ...
 
+        if (table == null) {
+          // Schema may return NULL for table if the query user doesn't have permissions to load the table. Ignore such
+          // tables as INFO SCHEMA is about showing tables which the use has access to query.
+          continue;
+        }
+
+        // Visit the table, and if requested ...
         if (shouldVisitTable(schemaPath, tableName) && visitTable(schemaPath,  tableName, table)) {
           // ... do for each of the table's fields.
           RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl());

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index dc90a33..bb71c31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -53,8 +53,8 @@ public class MockGroupScanPOP extends AbstractGroupScan {
 
   @JsonCreator
   public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
+    super((String)null);
     this.readEntries = readEntries;
-
     this.url = url;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 96226a1..1689300 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -27,9 +27,9 @@ import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -45,7 +45,8 @@ public class MockStorageEngine extends AbstractStoragePlugin {
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException {
 
     ArrayList<MockScanEntry> readEntries = selection.getListWith(new ObjectMapper(),
         new TypeReference<ArrayList<MockScanEntry>>() {
@@ -55,7 +56,7 @@ public class MockStorageEngine extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 9c83ea0..7298f53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -161,14 +161,9 @@ public class ParquetFormatPlugin implements FormatPlugin{
   }
 
   @Override
-  public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
-    return getGroupScan(selection, null);
-  }
-
-  @Override
-  public ParquetGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
-    final DrillFileSystem dfs = new DrillFileSystem(fsConf);
-    return new ParquetGroupScan(selection.getFileStatusList(dfs), this, selection.selectionRoot, columns);
+  public ParquetGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+      throws IOException {
+    return new ParquetGroupScan(userName, selection, this, selection.selectionRoot, columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index a59f2c9..21b9b48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,12 +18,14 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
@@ -50,17 +52,17 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.BlockMapBuilder;
 import org.apache.drill.exec.store.schedule.CompleteWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import parquet.hadoop.Footer;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.org.codehaus.jackson.annotate.JsonCreator;
 
-import com.codahale.metrics.Histogram;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.fasterxml.jackson.annotation.JacksonInject;
@@ -78,17 +80,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
 
-  private ListMultimap<Integer, RowGroupInfo> mappings;
-  private List<RowGroupInfo> rowGroupInfos;
   private final List<ReadEntryWithPath> entries;
   private final Stopwatch watch = new Stopwatch();
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
-  private final FileSystem fs;
-  private List<EndpointAffinity> endpointAffinities;
-  private String selectionRoot;
+  private final DrillFileSystem fs;
+  private final String selectionRoot;
 
+  private List<EndpointAffinity> endpointAffinities;
   private List<SchemaPath> columns;
+  private ListMultimap<Integer, RowGroupInfo> mappings;
+  private List<RowGroupInfo> rowGroupInfos;
 
   /*
    * total number of rows (obtained from parquet footer)
@@ -100,22 +102,9 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    */
   private Map<SchemaPath, Long> columnValueCounts;
 
-  public List<ReadEntryWithPath> getEntries() {
-    return entries;
-  }
-
-  @JsonProperty("format")
-  public ParquetFormatConfig getFormatConfig() {
-    return this.formatConfig;
-  }
-
-  @JsonProperty("storage")
-  public StoragePluginConfig getEngineConfig() {
-    return this.formatPlugin.getStorageConfig();
-  }
-
   @JsonCreator
   public ParquetGroupScan( //
+      @JsonProperty("userName") String userName,
       @JsonProperty("entries") List<ReadEntryWithPath> entries, //
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
@@ -123,6 +112,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot //
       ) throws IOException, ExecutionSetupException {
+    super(ImpersonationUtil.resolveUserName(userName));
     this.columns = columns;
     if (formatConfig == null) {
       formatConfig = new ParquetFormatConfig();
@@ -131,29 +121,28 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     Preconditions.checkNotNull(formatConfig);
     this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(formatPlugin);
-    this.fs = new DrillFileSystem(formatPlugin.getFsConf());
+    this.fs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
     this.formatConfig = formatPlugin.getConfig();
     this.entries = entries;
     this.selectionRoot = selectionRoot;
     this.readFooterFromEntries();
-
-  }
-
-  public String getSelectionRoot() {
-    return selectionRoot;
   }
 
-  public ParquetGroupScan(List<FileStatus> files, //
+  public ParquetGroupScan( //
+      String userName,
+      FileSelection selection, //
       ParquetFormatPlugin formatPlugin, //
       String selectionRoot,
       List<SchemaPath> columns) //
           throws IOException {
+    super(userName);
     this.formatPlugin = formatPlugin;
     this.columns = columns;
     this.formatConfig = formatPlugin.getConfig();
-    this.fs = new DrillFileSystem(formatPlugin.getFsConf());
+    this.fs = ImpersonationUtil.createFileSystem(userName, formatPlugin.getFsConf());
 
     this.entries = Lists.newArrayList();
+    List<FileStatus> files = selection.getFileStatusList(fs);
     for (FileStatus file : files) {
       entries.add(new ReadEntryWithPath(file.getPath().toString()));
     }
@@ -167,6 +156,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    * This is used to clone another copy of the group scan.
    */
   private ParquetGroupScan(ParquetGroupScan that) {
+    super(that);
     this.columns = that.columns;
     this.endpointAffinities = that.endpointAffinities;
     this.entries = that.entries;
@@ -180,6 +170,25 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.columnValueCounts = that.columnValueCounts;
   }
 
+
+  public List<ReadEntryWithPath> getEntries() {
+    return entries;
+  }
+
+  @JsonProperty("format")
+  public ParquetFormatConfig getFormatConfig() {
+    return this.formatConfig;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getEngineConfig() {
+    return this.formatPlugin.getStorageConfig();
+  }
+
+  public String getSelectionRoot() {
+    return selectionRoot;
+  }
+
   private void readFooterFromEntries()  throws IOException {
     List<FileStatus> files = Lists.newArrayList();
     for (ReadEntryWithPath e : entries) {
@@ -188,12 +197,27 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     readFooter(files);
   }
 
-  private void readFooter(List<FileStatus> statuses) throws IOException {
+  private void readFooter(final List<FileStatus> statuses) {
+    final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(getUserName());
+    try {
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          readFooterHelper(statuses);
+          return null;
+        }
+      });
+    } catch (InterruptedException | IOException e) {
+      final String errMsg = String.format("Failed to read footer entries from parquet input files: %s", e.getMessage());
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
+  }
+
+  private void readFooterHelper(List<FileStatus> statuses) throws IOException {
     watch.reset();
     watch.start();
     Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time();
 
-
     rowGroupInfos = Lists.newArrayList();
     long start = 0, length = 0;
     rowCount = 0;
@@ -373,7 +397,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
-    return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot);
+    return new ParquetRowGroupScan(
+        getUserName(), formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot);
   }
 
   private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index fd40f41..987f792 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -55,22 +55,26 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   @JsonCreator
   public ParquetRowGroupScan( //
       @JacksonInject StoragePluginRegistry registry, //
+      @JsonProperty("userName") String userName, //
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
       @JsonProperty("entries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, //
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot //
   ) throws ExecutionSetupException {
-    this((ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
+    this(userName, (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
             formatConfig == null ? new ParquetFormatConfig() : formatConfig),
         rowGroupReadEntries, columns, selectionRoot);
   }
 
   public ParquetRowGroupScan( //
+      String userName, //
       ParquetFormatPlugin formatPlugin, //
       List<RowGroupReadEntry> rowGroupReadEntries, //
-      List<SchemaPath> columns,
-      String selectionRoot) {
+      List<SchemaPath> columns, //
+      String selectionRoot //
+  ) {
+    super(userName);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
     this.formatConfig = formatPlugin.getConfig();
     this.rowGroupReadEntries = rowGroupReadEntries;
@@ -110,7 +114,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(formatPlugin, rowGroupReadEntries, columns, selectionRoot);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index b1c725c..52dccd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -90,7 +91,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       }
       final int id = rowGroupScan.getOperatorId();
       // Create the new row group scan with the new columns
-      rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getStorageEngine(), rowGroupScan.getRowGroupReadEntries(), newColumns, rowGroupScan.getSelectionRoot());
+      rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(),
+          rowGroupScan.getRowGroupReadEntries(), newColumns, rowGroupScan.getSelectionRoot());
       rowGroupScan.setOperatorId(id);
     }
 
@@ -100,7 +102,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
     } catch(IOException e) {
       throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
     }
-    Configuration conf = fs.getConf();
+    Configuration conf = new Configuration(fs.getConf());
     conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
     conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
     conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
@@ -119,7 +121,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       try {
         if ( ! footers.containsKey(e.getPath())){
           footers.put(e.getPath(),
-              ParquetFileReader.readFooter( fs.getConf(), new Path(e.getPath())));
+              ParquetFileReader.readFooter(conf, new Path(e.getPath())));
         }
         if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
           readers.add(

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 4a3b97b..a13c945 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -31,11 +31,11 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.apache.drill.exec.store.SchemaConfig;
 
 /**
  * A "storage" plugin for system tables.
@@ -67,12 +67,13 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
     parent.add(schema.getName(), schema);
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException {
     SystemTable table = selection.getWith(context.getConfig(), SystemTable.class);
     return new SystemTableScan(table, this);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index f8baf97..22bd7df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -54,11 +54,13 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
       @JsonProperty("table") SystemTable table, //
       @JacksonInject StoragePluginRegistry engineRegistry //
   ) throws IOException, ExecutionSetupException {
+    super((String)null);
     this.table = table;
     this.plugin = (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE);
   }
 
   public SystemTableScan(SystemTable table, SystemTablePlugin plugin) {
+    super((String)null);
     this.table = table;
     this.plugin = plugin;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
new file mode 100644
index 0000000..9997178
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ImpersonationUtil.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import com.google.common.base.Strings;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Utilities for impersonation purpose.
+ */
+public class ImpersonationUtil {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImpersonationUtil.class);
+
+  /**
+   * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} of operator owner if operator
+   * owner is valid. Otherwise create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for
+   * query user.
+   *
+   * @param opUserName Name of the user whom to impersonate while setting up the operator.
+   * @param queryUserName Name of the user who issues the query. If <i>opUserName</i> is invalid,
+   *                      then this parameter must be valid user name.
+   * @return
+   */
+  public static UserGroupInformation createProxyUgi(String opUserName, String queryUserName) {
+    if (!Strings.isNullOrEmpty(opUserName)) {
+      return createProxyUgi(opUserName);
+    }
+
+    if (Strings.isNullOrEmpty(queryUserName)) {
+      // TODO(DRILL-2097): Tests that use SimpleRootExec have don't assign any query user name in FragmentContext.
+      // Disable throwing exception to modifying the long list of test files.
+      // throw new DrillRuntimeException("Invalid value for query user name");
+      return getProcessUserUGI();
+    }
+
+    return createProxyUgi(queryUserName);
+  }
+
+  /**
+   * Create and return proxy user {@link org.apache.hadoop.security.UserGroupInformation} for give user name.
+   *
+   * TODO: we may want to cache the {@link org.apache.hadoop.security.UserGroupInformation} instances as we try to
+   * create different instances for the same user which is an unnecessary overhead.
+   *
+   * @param proxyUserName Proxy user name (must be valid)
+   * @return
+   */
+  public static UserGroupInformation createProxyUgi(String proxyUserName) {
+    try {
+      if (Strings.isNullOrEmpty(proxyUserName)) {
+        throw new DrillRuntimeException("Invalid value for proxy user name");
+      }
+
+      return UserGroupInformation.createProxyUser(proxyUserName, UserGroupInformation.getLoginUser());
+    } catch(IOException e) {
+      final String errMsg = "Failed to create proxy user UserGroupInformation object: " + e.getMessage();
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
+  }
+
+  /**
+   * If the given user name is empty, return the current process user name. This is a temporary change to avoid
+   * modifying long list of tests files which have GroupScan operator with no user name property.
+   * @param userName User name found in GroupScan POP definition.
+   */
+  public static String resolveUserName(String userName) {
+    if (!Strings.isNullOrEmpty(userName)) {
+      return userName;
+    }
+    return getProcessUserName();
+  }
+
+  /**
+   * Return the name of the user who is running the Drillbit.
+   *
+   * @return Drillbit process user.
+   */
+  public static String getProcessUserName() {
+    return getProcessUserUGI().getUserName();
+  }
+
+  /**
+   * Return the {@link org.apache.hadoop.security.UserGroupInformation} of user who is running the Drillbit.
+   *
+   * @return Drillbit process user {@link org.apache.hadoop.security.UserGroupInformation}.
+   */
+  public static UserGroupInformation getProcessUserUGI() {
+    try {
+      return UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      final String errMsg = "Failed to get process user UserGroupInformation object.";
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
+  }
+
+  /**
+   * Create DrillFileSystem for given <i>proxyUserName</i> and configuration.
+   *
+   * @param proxyUserName Name of the user whom to impersonate while accessing the FileSystem contents.
+   * @param fsConf FileSystem configuration.
+   * @return
+   */
+  public static DrillFileSystem createFileSystem(String proxyUserName, Configuration fsConf) {
+    return createFileSystem(proxyUserName, fsConf, null);
+  }
+
+  /**
+   * Create DrillFileSystem for given <i>proxyUserName</i>, configuration and stats.
+   *
+   * @param proxyUserName Name of the user whom to impersonate while accessing the FileSystem contents.
+   * @param fsConf FileSystem configuration.
+   * @param stats OperatorStats for DrillFileSystem (optional)
+   * @return
+   */
+  public static DrillFileSystem createFileSystem(String proxyUserName, Configuration fsConf, OperatorStats stats) {
+    return createFileSystem(createProxyUgi(proxyUserName), fsConf, stats);
+  }
+
+  /** Helper method to create DrillFileSystem */
+  private static DrillFileSystem createFileSystem(UserGroupInformation proxyUserUgi, final Configuration fsConf,
+      final OperatorStats stats) {
+    DrillFileSystem fs;
+    try {
+      fs = proxyUserUgi.doAs(new PrivilegedExceptionAction<DrillFileSystem>() {
+        public DrillFileSystem run() throws Exception {
+          logger.debug("Creating DrillFileSystem for proxy user: " + UserGroupInformation.getCurrentUser());
+          return new DrillFileSystem(fsConf, stats);
+        }
+      });
+    } catch (InterruptedException | IOException e) {
+      final String errMsg = "Failed to create DrillFileSystem for proxy user: " + e.getMessage();
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
+
+    return fs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index cb2753c..edbcfde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -779,7 +779,7 @@ public class Foreman implements Runnable {
     if (logger.isDebugEnabled()) {
       logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getConfig()));
     }
-    return new BasicOptimizer(queryContext).optimize(
+    return new BasicOptimizer(queryContext, initiatingClient).optimize(
         new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index be798ec..0701252 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.work.fragment;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -38,7 +39,9 @@ import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.testing.ExecutionControlsInjector;
+import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request
@@ -182,13 +185,23 @@ public class FragmentExecutor implements Runnable {
         }
       }
 
-      injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
-      /*
-       * Run the query until root.next returns false OR we no longer need to continue.
-       */
-      while (shouldContinue() && root.next()) {
-        // loop
-      }
+      final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ?
+          ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :
+          ImpersonationUtil.getProcessUserUGI();
+
+      queryUserUgi.doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
+          /*
+           * Run the query until root.next returns false OR we no longer need to continue.
+           */
+          while (shouldContinue() && root.next()) {
+            // loop
+          }
+
+          return null;
+        }
+      });
 
       updateState(FragmentState.FINISHED);
     } catch (AssertionError | Exception e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6bd8db0..8006533 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -101,6 +101,10 @@ drill.exec: {
       write: true
     }
   },
+  impersonation: {
+    enabled: false,
+    max_chained_user_hops: 3
+  },
   security.user.auth {
     enabled: false,
     packages += "org.apache.drill.exec.rpc.user.security",

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index c3223b8..e7f6896 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.QueryOptionManager;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
 import org.apache.drill.exec.testing.ExecutionControls;
@@ -102,7 +103,7 @@ public class PlanningBase extends ExecTest{
     final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config);
     final DrillOperatorTable table = new DrillOperatorTable(functionRegistry);
     final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
-    registry.getSchemaFactory().registerSchemas(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), root);
+    registry.getSchemaFactory().registerSchemas(SchemaConfig.newBuilder("foo", context).build(), root);
 
     new NonStrictExpectations() {
       {

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index ba905c4..dc37071 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
         bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
-    PhysicalPlan pp = new BasicOptimizer(qc).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
+    PhysicalPlan pp = new BasicOptimizer(qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
 
 
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
index 2cba992..604f375 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -38,8 +39,9 @@ public class TestExceptionInjection extends BaseTestQuery {
   private static final String NO_THROW_FAIL = "Didn't throw expected exception";
 
   private static final UserSession session = UserSession.Builder.newBuilder()
-    .withOptionManager(bits[0].getContext().getOptionManager())
-    .build();
+      .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+      .withOptionManager(bits[0].getContext().getOptionManager())
+      .build();
 
   /**
    * Class whose methods we want to simulate runtime at run-time for testing
@@ -248,8 +250,9 @@ public class TestExceptionInjection extends BaseTestQuery {
     final DrillbitContext drillbitContext2 = drillbit2.getContext();
 
     final UserSession session = UserSession.Builder.newBuilder()
-      .withOptionManager(drillbitContext1.getOptionManager())
-      .build();
+        .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+        .withOptionManager(drillbitContext1.getOptionManager())
+        .build();
 
     final String passthroughDesc = "<<injected from descPassthrough>>";
     final int nSkip = 7;

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index 1c219f0..508b10c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.testing;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -29,8 +30,9 @@ import static org.junit.Assert.fail;
 public class TestPauseInjection extends BaseTestQuery {
 
   private static final UserSession session = UserSession.Builder.newBuilder()
-    .withOptionManager(bits[0].getContext().getOptionManager())
-    .build();
+      .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+      .withOptionManager(bits[0].getContext().getOptionManager())
+      .build();
 
   /**
    * Class whose methods we want to simulate pauses at run-time for testing

http://git-wip-us.apache.org/repos/asf/drill/blob/40c90403/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3b246d9..91707fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -945,7 +945,7 @@
           <dependency>
             <groupId>net.hydromatic</groupId>
             <artifactId>optiq-core</artifactId>
-            <version>0.9-drill-r21</version>
+            <version>0.9-drill-r21.1</version>
             <exclusions>
               <exclusion>
                 <groupId>org.jgrapht</groupId>


Mime
View raw message