drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [3/4] drill git commit: DRILL-2413: FileSystemPlugin refactoring: avoid sharing DrillFileSystem across schemas
Date Tue, 21 Apr 2015 22:21:38 GMT
DRILL-2413: FileSystemPlugin refactoring: avoid sharing DrillFileSystem across schemas


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/117b7497
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/117b7497
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/117b7497

Branch: refs/heads/master
Commit: 117b749744d1775816fc5f9591dced3aae551352
Parents: fbb405b
Author: vkorukanti <venki.korukanti@gmail.com>
Authored: Tue Mar 10 10:19:35 2015 -0700
Committer: vkorukanti <venki.korukanti@gmail.com>
Committed: Tue Apr 21 13:16:00 2015 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseSchemaFactory.java    |   2 +-
 .../exec/store/hbase/HBaseStoragePlugin.java    |   2 +-
 .../exec/store/hive/HiveStoragePlugin.java      |   2 +-
 .../store/hive/schema/HiveSchemaFactory.java    |   3 +-
 .../exec/store/mongo/MongoStoragePlugin.java    |   2 +-
 .../store/mongo/schema/MongoSchemaFactory.java  |   4 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  43 ++++++-
 .../org/apache/drill/exec/ops/QueryContext.java |  15 ++-
 .../apache/drill/exec/store/SchemaFactory.java  |   4 +-
 .../drill/exec/store/StoragePluginRegistry.java |   2 +-
 .../drill/exec/store/avro/AvroFormatPlugin.java |   9 +-
 .../exec/store/dfs/BasicFormatMatcher.java      |  25 ++--
 .../drill/exec/store/dfs/DrillFileSystem.java   |   6 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java  |  34 +++---
 .../exec/store/dfs/FileSystemSchemaFactory.java |  12 +-
 .../drill/exec/store/dfs/FormatCreator.java     |  17 +--
 .../drill/exec/store/dfs/FormatMatcher.java     |   2 +-
 .../drill/exec/store/dfs/FormatPlugin.java      |   3 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  | 115 ++++++++++---------
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  27 +++--
 .../exec/store/dfs/easy/EasyGroupScan.java      |   8 +-
 .../exec/store/easy/json/JSONFormatPlugin.java  |  15 ++-
 .../exec/store/easy/text/TextFormatPlugin.java  |  18 ++-
 .../store/ischema/InfoSchemaStoragePlugin.java  |   2 +-
 .../exec/store/mock/MockStorageEngine.java      |   2 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |  76 ++++++------
 .../exec/store/parquet/ParquetGroupScan.java    |  17 +--
 .../store/parquet/ParquetScanBatchCreator.java  |   8 +-
 .../drill/exec/store/sys/SystemTablePlugin.java |   2 +-
 .../exec/store/text/DrillTextRecordReader.java  |   6 +-
 .../exec/work/batch/ControlHandlerImpl.java     |   2 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  11 +-
 .../work/fragment/NonRootFragmentManager.java   |   2 +-
 .../exec/store/dfs/TestDrillFileSystem.java     |   2 +-
 34 files changed, 279 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index 7a0a64b..1c407e1 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -46,7 +46,7 @@ public class HBaseSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     HBaseSchema schema = new HBaseSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index c10b0ab..948d462 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -66,7 +66,7 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     schemaFactory.registerSchemas(session, parent);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index f4baf3b..91e7a92 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -82,7 +82,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     schemaFactory.registerSchemas(session, parent);
   }
   public Set<StoragePluginOptimizerRule> getOptimizerRules() {

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 0e16e6f..587e90d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.hive.schema;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -186,7 +187,7 @@ public class HiveSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     HiveSchema schema = new HiveSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
index e46d8ec..dfad5ef 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java
@@ -63,7 +63,7 @@ public class MongoStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     schemaFactory.registerSchemas(session, parent);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
index 3c70638..f650ccc 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.mongo.schema;
 
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,7 +37,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
 import org.apache.drill.exec.store.mongo.MongoCnxnManager;
 import org.apache.drill.exec.store.mongo.MongoScanSpec;
 import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
@@ -120,7 +120,7 @@ public class MongoSchemaFactory implements SchemaFactory {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     MongoSchema schema = new MongoSchema(schemaName);
     SchemaPlus hPlus = parent.add(schemaName, schema);
     schema.setHolder(hPlus);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 9400355..7dfd0e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 
 import net.hydromatic.optiq.SchemaPlus;
-import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
@@ -64,7 +63,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
 
   private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
   private final DrillbitContext context;
-  private final UserClientConnection connection; // is null if attached to non-root fragment
+  private final UserClientConnection connection; // is null if this context is for non-root fragment
+  private final QueryContext queryContext; // is null if this context is for non-root fragment
   private final FragmentStats stats;
   private final FunctionImplementationRegistry funcRegistry;
   private final BufferAllocator allocator;
@@ -87,10 +87,34 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
   private final AccountingUserConnection accountingUserConnection;
 
+  /**
+   * Create a FragmentContext instance for non-root fragment.
+   *
+   * @param dbContext DrillbitContext.
+   * @param fragment Fragment implementation.
+   * @param funcRegistry FunctionImplementationRegistry.
+   * @throws ExecutionSetupException
+   */
   public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
+      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+    this(dbContext, fragment, null, null, funcRegistry);
+  }
+
+  /**
+   * Create a FragmentContext instance for root fragment.
+   *
+   * @param dbContext DrillbitContext.
+   * @param fragment Fragment implementation.
+   * @param queryContext QueryContext.
+   * @param connection UserClientConnection.
+   * @param funcRegistry FunctionImplementationRegistry.
+   * @throws ExecutionSetupException
+   */
+  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
       final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
     throws ExecutionSetupException {
     this.context = dbContext;
+    this.queryContext = queryContext;
     this.connection = connection;
     this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
     this.fragment = fragment;
@@ -128,6 +152,15 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     bufferManager = new BufferManager(this.allocator, this);
   }
 
+  /**
+   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
+   * the long list of test files.
+   */
+  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
+      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+    this(dbContext, fragment, null, connection, funcRegistry);
+  }
+
   public OptionManager getOptions() {
     return fragmentOptions;
   }
@@ -162,15 +195,13 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   }
 
   public SchemaPlus getRootSchema() {
-    if (connection == null) {
+    if (queryContext == null) {
       fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
           "This is a non-root fragment."));
       return null;
     }
 
-    final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
-    context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root);
-    return root;
+    return queryContext.getRootSchema();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 2dcac25..cd5c054 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.ops;
 
+import java.io.IOException;
 import java.util.Collection;
 
 import io.netty.buffer.DrillBuf;
@@ -46,7 +47,7 @@ import org.apache.drill.exec.testing.ExecutionControls;
 // TODO - consider re-name to PlanningContext, as the query execution context actually appears
 // in fragment contexts
 public class QueryContext implements AutoCloseable, UdfUtilities {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
 
   private final DrillbitContext drillbitContext;
   private final UserSession session;
@@ -113,9 +114,15 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
   }
 
   public SchemaPlus getRootSchema() {
-    final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
-    drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
-    return rootSchema;
+    try {
+      final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
+      drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
+      return rootSchema;
+    } catch(IOException e) {
+      final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
+      logger.error(errMsg, e);
+      throw new DrillRuntimeException(errMsg, e);
+    }
   }
 
   public OptionManager getOptions() {

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
index feadabd..14d2fab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -21,8 +21,10 @@ import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.exec.rpc.user.UserSession;
 
+import java.io.IOException;
+
 public interface SchemaFactory {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
 
-  public void registerSchemas(UserSession session, SchemaPlus parent);
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 5d0eed6..cb9ee0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -301,7 +301,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
   public class DrillSchemaFactory implements SchemaFactory {
 
     @Override
-    public void registerSchemas(UserSession session, SchemaPlus parent) {
+    public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
       Stopwatch watch = new Stopwatch();
       watch.start();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 2f487d6..30c45fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
 import java.util.List;
@@ -40,13 +41,13 @@ import java.util.List;
  */
 public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
 
-  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
+  public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
                           StoragePluginConfig storagePluginConfig) {
-    this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
+    this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig());
   }
 
-  public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
-    super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
+  public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
+    super(name, context, fsConf, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 9756f3c..3768aea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -36,34 +37,32 @@ import com.google.common.collect.Range;
 public class BasicFormatMatcher extends FormatMatcher{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);
 
-  private final List<Pattern> patterns;
-  private final MagicStringMatcher matcher;
-  protected final DrillFileSystem fs;
   protected final FormatPlugin plugin;
   protected final boolean compressible;
   protected final CompressionCodecFactory codecFactory;
 
-  public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<Pattern> patterns, List<MagicString> magicStrings) {
+  private final List<Pattern> patterns;
+  private final MagicStringMatcher matcher;
+
+  public BasicFormatMatcher(FormatPlugin plugin, List<Pattern> patterns, List<MagicString> magicStrings) {
     super();
     this.patterns = ImmutableList.copyOf(patterns);
     this.matcher = new MagicStringMatcher(magicStrings);
-    this.fs = fs;
     this.plugin = plugin;
     this.compressible = false;
     this.codecFactory = null;
   }
 
-  public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<String> extensions, boolean compressible) {
+  public BasicFormatMatcher(FormatPlugin plugin, Configuration fsConf, List<String> extensions, boolean compressible) {
     List<Pattern> patterns = Lists.newArrayList();
     for (String extension : extensions) {
       patterns.add(Pattern.compile(".*\\." + extension));
     }
     this.patterns = patterns;
     this.matcher = new MagicStringMatcher(new ArrayList<MagicString>());
-    this.fs = fs;
     this.plugin = plugin;
     this.compressible = compressible;
-    this.codecFactory = new CompressionCodecFactory(fs.getConf());
+    this.codecFactory = new CompressionCodecFactory(fsConf);
   }
 
   @Override
@@ -72,8 +71,8 @@ public class BasicFormatMatcher extends FormatMatcher{
   }
 
   @Override
-  public FormatSelection isReadable(FileSelection selection) throws IOException {
-    if (isReadable(selection.getFirstPath(fs))) {
+  public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException {
+    if (isReadable(fs, selection.getFirstPath(fs))) {
       if (plugin.getName() != null) {
         NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
         namedConfig.name = plugin.getName();
@@ -85,7 +84,7 @@ public class BasicFormatMatcher extends FormatMatcher{
     return null;
   }
 
-  protected final boolean isReadable(FileStatus status) throws IOException {
+  protected final boolean isReadable(DrillFileSystem fs, FileStatus status) throws IOException {
     CompressionCodec codec = null;
     if (compressible) {
       codec = codecFactory.getCodec(status.getPath());
@@ -103,7 +102,7 @@ public class BasicFormatMatcher extends FormatMatcher{
       }
     }
 
-    if (matcher.matches(status)) {
+    if (matcher.matches(fs, status)) {
       return true;
     }
     return false;
@@ -128,7 +127,7 @@ public class BasicFormatMatcher extends FormatMatcher{
       }
     }
 
-    public boolean matches(FileStatus status) throws IOException{
+    public boolean matches(DrillFileSystem fs, FileStatus status) throws IOException{
       if (ranges.isEmpty()) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 2683cca..f8afe3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -96,11 +96,11 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
   private final OperatorStats operatorStats;
 
   public DrillFileSystem(Configuration fsConf) throws IOException {
-    this(FileSystem.get(fsConf), null);
+    this(fsConf, null);
   }
 
-  public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) {
-    this.underlyingFs = fs;
+  public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException {
+    this.underlyingFs = FileSystem.get(fsConf);
     this.operatorStats = operatorStats;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index c5ca41b..775b402 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -44,6 +44,8 @@ import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import static org.apache.drill.exec.store.dfs.FileSystemSchemaFactory.DEFAULT_WS_NAME;
+
 /**
  * A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem,
  * LocalFileSystem, as well Apache Drill specific CachedFileSystem, ClassPathFileSystem and LocalSyncableFileSystem.
@@ -51,26 +53,26 @@ import com.google.common.collect.Maps;
  * references to the FileSystem configuration and path management.
  */
 public class FileSystemPlugin extends AbstractStoragePlugin{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
 
   private final FileSystemSchemaFactory schemaFactory;
-  private Map<String, FormatPlugin> formatPluginsByName;
-  private Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
-  private FileSystemConfig config;
-  private DrillbitContext context;
-  private final DrillFileSystem fs;
+  private final Map<String, FormatPlugin> formatPluginsByName;
+  private final Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
+  private final FileSystemConfig config;
+  private final DrillbitContext context;
+  private final Configuration fsConf;
 
   public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException{
     try {
       this.config = config;
       this.context = context;
 
-      Configuration fsConf = new Configuration();
+      fsConf = new Configuration();
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection);
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
       fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
-      fs = new DrillFileSystem(fsConf);
-      formatPluginsByName = FormatCreator.getFormatPlugins(context, fs, config);
+
+      formatPluginsByName = FormatCreator.getFormatPlugins(context, fsConf, config);
       List<FormatMatcher> matchers = Lists.newArrayList();
       formatPluginsByConfig = Maps.newHashMap();
       for (FormatPlugin p : formatPluginsByName.values()) {
@@ -78,17 +80,17 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
         formatPluginsByConfig.put(p.getConfig(), p);
       }
 
-      boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty();
+      final boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty();
       List<WorkspaceSchemaFactory> factories = Lists.newArrayList();
       if (!noWorkspace) {
         for (Map.Entry<String, WorkspaceConfig> space : config.workspaces.entrySet()) {
-          factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, fs, space.getValue(), matchers));
+          factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, space.getValue(), matchers));
         }
       }
 
       // if the "default" workspace is not given add one.
-      if (noWorkspace || !config.workspaces.containsKey("default")) {
-        factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers));
+      if (noWorkspace || !config.workspaces.containsKey(DEFAULT_WS_NAME)) {
+        factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, DEFAULT_WS_NAME, name, WorkspaceConfig.DEFAULT, matchers));
       }
 
       this.schemaFactory = new FileSystemSchemaFactory(name, factories);
@@ -123,7 +125,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     schemaFactory.registerSchemas(session, parent);
   }
 
@@ -151,5 +153,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
     return setBuilder.build();
   }
 
-
+  public Configuration getFsConf() {
+    return fsConf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 44132d0..e11712e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -44,12 +44,12 @@ import org.apache.hadoop.fs.Path;
  * This is the top level schema that responds to root level path requests. Also supports
  */
 public class FileSystemSchemaFactory implements SchemaFactory{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class);
+
+  public static final String DEFAULT_WS_NAME = "default";
 
   private List<WorkspaceSchemaFactory> factories;
   private String schemaName;
-  private final String defaultSchemaName = "default";
-
 
   public FileSystemSchemaFactory(String schemaName, List<WorkspaceSchemaFactory> factories) {
     super();
@@ -58,7 +58,7 @@ public class FileSystemSchemaFactory implements SchemaFactory{
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     FileSystemSchema schema = new FileSystemSchema(schemaName, session);
     SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
     schema.setPlus(plusOfThis);
@@ -69,14 +69,14 @@ public class FileSystemSchemaFactory implements SchemaFactory{
     private final WorkspaceSchema defaultSchema;
     private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
 
-    public FileSystemSchema(String name, UserSession session) {
+    public FileSystemSchema(String name, UserSession session) throws IOException {
       super(ImmutableList.<String>of(), name);
       for(WorkspaceSchemaFactory f :  factories){
         WorkspaceSchema s = f.createSchema(getSchemaPath(), session);
         schemaMap.put(s.getName(), s);
       }
 
-      defaultSchema = schemaMap.get(defaultSchemaName);
+      defaultSchema = schemaMap.get(DEFAULT_WS_NAME);
     }
 
     void setPlus(SchemaPlus plusOfThis){

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
index c164ed5..d2a903b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
@@ -31,20 +31,23 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.DrillbitContext;
 
 import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
 
 public class FormatCreator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
 
-  static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPluginConfig.class);
-  static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class);
+  private static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class,
+      Configuration.class, StoragePluginConfig.class, FormatPluginConfig.class);
+  private static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class,
+      Configuration.class, StoragePluginConfig.class);
 
-  static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig) {
+  static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, Configuration fsConf,
+      FileSystemConfig storageConfig) {
     final DrillConfig config = context.getConfig();
     Map<String, FormatPlugin> plugins = Maps.newHashMap();
 
     Collection<Class<? extends FormatPlugin>> pluginClasses = PathScanner.scanForImplementations(FormatPlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
 
-
     if (storageConfig.formats == null || storageConfig.formats.isEmpty()) {
 
       for (Class<? extends FormatPlugin> pluginClass: pluginClasses) {
@@ -53,7 +56,7 @@ public class FormatCreator {
             if (!DEFAULT_BASED.check(c)) {
               continue;
             }
-            FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fileSystem, storageConfig);
+            FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fsConf, storageConfig);
             plugins.put(plugin.getName(), plugin);
           } catch (Exception e) {
             logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e);
@@ -84,7 +87,7 @@ public class FormatCreator {
           continue;
         }
         try {
-          plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fileSystem, storageConfig, e.getValue()));
+          plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fsConf, storageConfig, e.getValue()));
         } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
           logger.warn("Failure initializing storage config named '{}' of type '{}'.", e.getKey(), e.getValue().getClass().getName(), e1);
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
index 92e3d0a..0b8c7a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
@@ -23,6 +23,6 @@ public abstract class FormatMatcher {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatMatcher.class);
 
   public abstract boolean supportDirectoryReads();
-  public abstract FormatSelection isReadable(FileSelection selection) throws IOException;
+  public abstract FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException;
   public abstract FormatPlugin getFormatPlugin();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 58d5b42..955dfeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Similar to a storage engine but built specifically to work within a FileSystem context.
@@ -51,7 +52,7 @@ public interface FormatPlugin {
 
   public FormatPluginConfig getConfig();
   public StoragePluginConfig getStorageConfig();
-  public DrillFileSystem getFileSystem();
+  public Configuration getFsConf();
   public DrillbitContext getContext();
   public String getName();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 45e9129..a536350 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
@@ -52,14 +53,14 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
+public class WorkspaceSchemaFactory {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
 
   private final List<FormatMatcher> fileMatchers;
   private final List<FormatMatcher> dirMatchers;
 
   private final WorkspaceConfig config;
-  private final DrillFileSystem fs;
+  private final Configuration fsConf;
   private final DrillConfig drillConfig;
   private final String storageEngineName;
   private final String schemaName;
@@ -67,9 +68,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
   private final ObjectMapper mapper;
 
   public WorkspaceSchemaFactory(DrillConfig drillConfig, FileSystemPlugin plugin, String schemaName,
-      String storageEngineName, DrillFileSystem fileSystem, WorkspaceConfig config,
-      List<FormatMatcher> formatMatchers) throws ExecutionSetupException, IOException {
-    this.fs = fileSystem;
+      String storageEngineName, WorkspaceConfig config, List<FormatMatcher> formatMatchers)
+    throws ExecutionSetupException, IOException {
+    this.fsConf = plugin.getFsConf();
     this.plugin = plugin;
     this.drillConfig = drillConfig;
     this.config = config;
@@ -95,7 +96,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
             defaultInputFormat, storageEngineName, schemaName);
         throw new ExecutionSetupException(message);
       }
-      final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin, fs,
+      final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin,
           ImmutableList.of(Pattern.compile(".*")), ImmutableList.<MagicString>of());
       fileMatchers.add(fallbackMatcher);
     }
@@ -105,54 +106,21 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
     return DotDrillType.VIEW.getPath(config.getLocation(), name);
   }
 
-  public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) {
+  public WorkspaceSchema createSchema(List<String> parentSchemaPath, UserSession session) throws IOException {
     return new WorkspaceSchema(parentSchemaPath, schemaName, session);
   }
 
-  @Override
-  public DrillTable create(String key) {
-    try {
+  public class WorkspaceSchema extends AbstractSchema implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
+    private final ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(this);
+    private final UserSession session;
+    private final DrillFileSystem fs;
 
-      FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key);
-      if (fileSelection == null) {
-        return null;
-      }
-
-      if (fileSelection.containsDirectories(fs)) {
-        for (FormatMatcher m : dirMatchers) {
-          try {
-            Object selection = m.isReadable(fileSelection);
-            if (selection != null) {
-              return new DynamicDrillTable(plugin, storageEngineName, selection);
-            }
-          } catch (IOException e) {
-            logger.debug("File read failed.", e);
-          }
-        }
-        fileSelection = fileSelection.minusDirectories(fs);
-      }
-
-      for (FormatMatcher m : fileMatchers) {
-        Object selection = m.isReadable(fileSelection);
-        if (selection != null) {
-          return new DynamicDrillTable(plugin, storageEngineName, selection);
-        }
-      }
-      return null;
-
-    } catch (IOException e) {
-      logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
+    public WorkspaceSchema(List<String> parentSchemaPath, String wsName, UserSession session) throws IOException {
+      super(parentSchemaPath, wsName);
+      this.session = session;
+      this.fs = new DrillFileSystem(fsConf);
     }
 
-    return null;
-  }
-
-  @Override
-  public void destroy(DrillTable value) {
-  }
-
-  public class WorkspaceSchema extends AbstractSchema {
-
     public boolean createView(View view) throws Exception {
       Path viewPath = getViewPath(view.getName());
       boolean replaced = fs.exists(viewPath);
@@ -186,15 +154,6 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
       fs.delete(getViewPath(viewName), false);
     }
 
-    private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<>(WorkspaceSchemaFactory.this);
-
-    private UserSession session;
-
-    public WorkspaceSchema(List<String> parentSchemaPath, String name, UserSession session) {
-      super(parentSchemaPath, name);
-      this.session = session;
-    }
-
     private Set<String> getViews() {
       Set<String> viewSet = Sets.newHashSet();
       // Look for files with ".view.drill" extension.
@@ -282,5 +241,47 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
     public String getTypeName() {
       return FileSystemConfig.NAME;
     }
+
+    @Override
+    public DrillTable create(String key) {
+      try {
+
+        FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key);
+        if (fileSelection == null) {
+          return null;
+        }
+
+        if (fileSelection.containsDirectories(fs)) {
+          for (FormatMatcher m : dirMatchers) {
+            try {
+              Object selection = m.isReadable(fs, fileSelection);
+              if (selection != null) {
+                return new DynamicDrillTable(plugin, storageEngineName, selection);
+              }
+            } catch (IOException e) {
+              logger.debug("File read failed.", e);
+            }
+          }
+          fileSelection = fileSelection.minusDirectories(fs);
+        }
+
+        for (FormatMatcher m : fileMatchers) {
+          Object selection = m.isReadable(fs, fileSelection);
+          if (selection != null) {
+            return new DynamicDrillTable(plugin, storageEngineName, selection);
+          }
+        }
+        return null;
+
+      } catch (IOException e) {
+        logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
+      }
+
+      return null;
+    }
+
+    @Override
+    public void destroy(DrillTable value) {
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 6e1e0cc..5c7152a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -54,38 +54,39 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
 
   private final BasicFormatMatcher matcher;
   private final DrillbitContext context;
   private final boolean readable;
   private final boolean writable;
   private final boolean blockSplittable;
-  private final DrillFileSystem fs;
+  private final Configuration fsConf;
   private final StoragePluginConfig storageConfig;
   protected final FormatPluginConfig formatConfig;
   private final String name;
   protected final CompressionCodecFactory codecFactory;
   private final boolean compressible;
 
-  protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig,
-                             T formatConfig, boolean readable, boolean writable, boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName){
-    this.matcher = new BasicFormatMatcher(this, fs, extensions, compressible);
+  protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+      StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable,
+      boolean compressible, List<String> extensions, String defaultName){
+    this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
     this.readable = readable;
     this.writable = writable;
     this.context = context;
     this.blockSplittable = blockSplittable;
     this.compressible = compressible;
-    this.fs = fs;
+    this.fsConf = fsConf;
     this.storageConfig = storageConfig;
     this.formatConfig = formatConfig;
     this.name = name == null ? defaultName : name;
-    this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getConf()));
+    this.codecFactory = new CompressionCodecFactory(new Configuration(fsConf));
   }
 
   @Override
-  public DrillFileSystem getFileSystem() {
-    return fs;
+  public Configuration getFsConf() {
+    return fsConf;
   }
 
   @Override
@@ -152,7 +153,13 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     int numParts = 0;
     OperatorContext oContext = new OperatorContext(scan, context,
         false /* ScanBatch is not subject to fragment memory limit */);
-    DrillFileSystem dfs = new DrillFileSystem(fs, oContext.getStats());
+    DrillFileSystem dfs;
+    try {
+      dfs = new DrillFileSystem(fsConf, oContext.getStats());
+    } catch (IOException e) {
+      throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
+    }
+
     for(FileWork work : scan.getWorkUnits()){
       readers.add(getRecordReader(context, dfs, work, scan.getColumns()));
       if (scan.getSelectionRoot() != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 54cad56..7c70df3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
@@ -51,7 +52,7 @@ import com.google.common.collect.Lists;
 
 @JsonTypeName("fs-scan")
 public class EasyGroupScan extends AbstractFileGroupScan{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
 
   private FileSelection selection;
   private final EasyFormatPlugin<?> formatPlugin;
@@ -109,9 +110,10 @@ public class EasyGroupScan extends AbstractFileGroupScan{
   }
 
   private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
+    final DrillFileSystem dfs = new DrillFileSystem(formatPlugin.getFsConf());
     this.selection = selection;
-    BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem(), formatPlugin.getContext().getBits());
-    this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+    BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
+    this.chunks = b.generateFileWork(selection.getFileStatusList(dfs), formatPlugin.isBlockSplittable());
     this.maxWidth = chunks.size();
     this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 2e65466..32e34b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
@@ -38,22 +39,26 @@ import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
 
   private static final boolean IS_COMPRESSIBLE = true;
+  private static final String DEFAULT_NAME = "json";
+  private static final List<String> DEFAULT_EXTS = ImmutableList.of("json");
 
-  public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
-    this(name, context, fs, storageConfig, new JSONFormatConfig());
+  public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+    this(name, context, fsConf, storageConfig, new JSONFormatConfig());
   }
 
-  public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
-    super(name, context, fs, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, Lists.newArrayList("json"), "json");
+  public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+      JSONFormatConfig formatPluginConfig) {
+    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE,
+        DEFAULT_EXTS, DEFAULT_NAME);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index bf46395..237589c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.store.easy.text;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -42,6 +42,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
@@ -51,13 +52,17 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
+  private final static String DEFAULT_NAME = "text";
 
-  public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
-    super(name, context, fs, storageConfig, new TextFormatConfig(), true, false, true, true, new ArrayList<String>(), "text");
+  public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+    super(name, context, fsConf, storageConfig, new TextFormatConfig(), true, false, true, true,
+        Collections.<String>emptyList(), DEFAULT_NAME);
   }
 
-  public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, TextFormatConfig formatPluginConfig) {
-    super(name, context, fs, config, formatPluginConfig, true, false, true, true, formatPluginConfig.getExtensions(), "text");
+  public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+      TextFormatConfig formatPluginConfig) {
+    super(name, context, fsConf, config, formatPluginConfig, true, false, true, true,
+        formatPluginConfig.getExtensions(), DEFAULT_NAME);
   }
 
 
@@ -67,7 +72,8 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     Path path = dfs.makeQualified(new Path(fileWork.getPath()));
     FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
     Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported");
-    return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
+    return new DrillTextRecordReader(split, getFsConf(), context,
+        ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index 77c6b9a..a1249e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -69,7 +69,7 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin implements In
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     ISchema s = new ISchema(parent, this);
     parent.add(s.getName(), s);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 51b2208..96226a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -55,7 +55,7 @@ public class MockStorageEngine extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index e204a2c..9c83ea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -62,37 +62,44 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class ParquetFormatPlugin implements FormatPlugin{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
 
-  private final DrillbitContext context;
   public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-  private CodecFactoryExposer codecFactoryExposer;
-  private final DrillFileSystem fs;
+
+  private static final String DEFAULT_NAME = "parquet";
+
+  private static final List<Pattern> PATTERNS = Lists.newArrayList(
+      Pattern.compile(".*\\.parquet$"),
+      Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE));
+  private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC));
+
+  private final DrillbitContext context;
+  private final CodecFactoryExposer codecFactoryExposer;
+  private final Configuration fsConf;
   private final ParquetFormatMatcher formatMatcher;
   private final ParquetFormatConfig config;
   private final StoragePluginConfig storageConfig;
   private final String name;
 
-  public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig){
-    this(name, context, fs, storageConfig, new ParquetFormatConfig());
+  public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+      StoragePluginConfig storageConfig){
+    this(name, context, fsConf, storageConfig, new ParquetFormatConfig());
   }
 
-  public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
+  public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+      StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
     this.context = context;
-    this.codecFactoryExposer = new CodecFactoryExposer(fs.getConf());
+    this.codecFactoryExposer = new CodecFactoryExposer(fsConf);
     this.config = formatConfig;
-    this.formatMatcher = new ParquetFormatMatcher(this, fs);
+    this.formatMatcher = new ParquetFormatMatcher(this);
     this.storageConfig = storageConfig;
-    this.fs = fs;
-    this.name = name == null ? "parquet" : name;
-  }
-
-  Configuration getHadoopConfig() {
-    return fs.getConf();
+    this.fsConf = fsConf;
+    this.name = name == null ? DEFAULT_NAME : name;
   }
 
-  public DrillFileSystem getFileSystem() {
-    return fs;
+  @Override
+  public Configuration getFsConf() {
+    return fsConf;
   }
 
   @Override
@@ -155,12 +162,13 @@ public class ParquetFormatPlugin implements FormatPlugin{
 
   @Override
   public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
-    return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
+    return getGroupScan(selection, null);
   }
 
   @Override
   public ParquetGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
-    return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
+    final DrillFileSystem dfs = new DrillFileSystem(fsConf);
+    return new ParquetGroupScan(selection.getFileStatusList(dfs), this, selection.selectionRoot, columns);
   }
 
   @Override
@@ -190,20 +198,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
 
   private static class ParquetFormatMatcher extends BasicFormatMatcher{
 
-    private final DrillFileSystem fs;
-
-    public ParquetFormatMatcher(ParquetFormatPlugin plugin, DrillFileSystem fs) {
-      super(plugin, fs, //
-          Lists.newArrayList( //
-              Pattern.compile(".*\\.parquet$"), //
-              Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE) //
-              //
-              ),
-          Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC))
-
-          );
-      this.fs = fs;
-
+    public ParquetFormatMatcher(ParquetFormatPlugin plugin) {
+      super(plugin, PATTERNS, MAGIC_STRINGS);
     }
 
     @Override
@@ -212,17 +208,17 @@ public class ParquetFormatPlugin implements FormatPlugin{
     }
 
     @Override
-    public FormatSelection isReadable(FileSelection selection) throws IOException {
+    public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException {
       // TODO: we only check the first file for directory reading.  This is because
       if(selection.containsDirectories(fs)){
-        if(isDirReadable(selection.getFirstPath(fs))){
+        if(isDirReadable(fs, selection.getFirstPath(fs))){
           return new FormatSelection(plugin.getConfig(), selection);
         }
       }
-      return super.isReadable(selection);
+      return super.isReadable(fs, selection);
     }
 
-    boolean isDirReadable(FileStatus dir) {
+    boolean isDirReadable(DrillFileSystem fs, FileStatus dir) {
       Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE);
       try {
         if (fs.exists(p)) {
@@ -235,16 +231,12 @@ public class ParquetFormatPlugin implements FormatPlugin{
           if (files.length == 0) {
             return false;
           }
-          return super.isReadable(files[0]);
+          return super.isReadable(fs, files[0]);
         }
       } catch (IOException e) {
         logger.info("Failure while attempting to check for Parquet metadata file.", e);
         return false;
       }
     }
-
-
-
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index acac61f..a59f2c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
 import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
@@ -76,11 +77,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
-  static final String ENDPOINT_BYTES_TIMER = MetricRegistry.name(ParquetGroupScan.class, "endpointBytes");
-  static final String ASSIGNMENT_TIMER = MetricRegistry.name(ParquetGroupScan.class, "applyAssignments");
-  static final String ASSIGNMENT_AFFINITY_HIST = MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity");
-
-  final Histogram assignmentAffinityStats = metrics.histogram(ASSIGNMENT_AFFINITY_HIST);
 
   private ListMultimap<Integer, RowGroupInfo> mappings;
   private List<RowGroupInfo> rowGroupInfos;
@@ -135,7 +131,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     Preconditions.checkNotNull(formatConfig);
     this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(formatPlugin);
-    this.fs = formatPlugin.getFileSystem();
+    this.fs = new DrillFileSystem(formatPlugin.getFsConf());
     this.formatConfig = formatPlugin.getConfig();
     this.entries = entries;
     this.selectionRoot = selectionRoot;
@@ -155,7 +151,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.formatPlugin = formatPlugin;
     this.columns = columns;
     this.formatConfig = formatPlugin.getConfig();
-    this.fs = formatPlugin.getFileSystem();
+    this.fs = new DrillFileSystem(formatPlugin.getFsConf());
 
     this.entries = Lists.newArrayList();
     for (FileStatus file : files) {
@@ -205,7 +201,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     ColumnChunkMetaData columnChunkMetaData;
 
-    List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getHadoopConfig(), statuses, 16);
+    List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getFsConf(), statuses, 16);
     for (Footer footer : footers) {
       int index = 0;
       ParquetMetadata metadata = footer.getParquetMetadata();
@@ -260,11 +256,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
-  @JsonIgnore
-  public FileSystem getFileSystem() {
-    return this.fs;
-  }
-
   @Override
   public void modifyFileSelection(FileSelection selection) {
     entries.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index c1f815e..b1c725c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import parquet.column.ColumnDescriptor;
@@ -95,7 +94,12 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       rowGroupScan.setOperatorId(id);
     }
 
-    DrillFileSystem fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFileSystem(), oContext.getStats());
+    DrillFileSystem fs;
+    try {
+      fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFsConf(), oContext.getStats());
+    } catch(IOException e) {
+      throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
+    }
     Configuration conf = fs.getConf();
     conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
     conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index b92f98c..4a3b97b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -67,7 +67,7 @@ public class SystemTablePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public void registerSchemas(UserSession session, SchemaPlus parent) {
+  public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
     parent.add(schema.getName(), schema);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 1ad053d..3368412 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.vector.RepeatedVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileSplit;
@@ -69,7 +70,8 @@ public class DrillTextRecordReader extends AbstractRecordReader {
   private FileSplit split;
   private long totalRecordsRead;
 
-  public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) {
+  public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context, char delimiter,
+      List<SchemaPath> columns) {
     this.fragmentContext = context;
     this.delimiter = (byte) delimiter;
     this.split = split;
@@ -95,7 +97,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
     targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
 
     TextInputFormat inputFormat = new TextInputFormat();
-    JobConf job = new JobConf();
+    JobConf job = new JobConf(fsConf);
     job.setInt("io.file.buffer.size", context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE));
     job.setInputFormat(inputFormat.getClass());
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3fb0775..b6c6852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -124,7 +124,7 @@ public class ControlHandlerImpl implements ControlMessageHandler {
     try {
       // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
       if (fragment.getLeafFragment()) {
-        final FragmentContext context = new FragmentContext(drillbitContext, fragment, null,
+        final FragmentContext context = new FragmentContext(drillbitContext, fragment,
             drillbitContext.getFunctionImplementationRegistry());
         final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
         final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d94ffba..cb2753c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -348,7 +348,7 @@ public class Foreman implements Runnable {
     injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
 
     // set up the root fragment first so we'll have incoming buffers available.
-    setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
+    setupRootFragment(rootPlanFragment, work.getRootOperator());
 
     setupNonRootFragments(planFragments);
     drillbitContext.getAllocator().resetFragmentLimits(); // TODO a global effect for this query?!?
@@ -791,15 +791,14 @@ public class Foreman implements Runnable {
    * Set up the root fragment (which will run locally), and submit it for execution.
    *
    * @param rootFragment
-   * @param rootClient
    * @param rootOperator
    * @throws ExecutionSetupException
    */
-  private void setupRootFragment(final PlanFragment rootFragment, final UserClientConnection rootClient,
-      final FragmentRoot rootOperator) throws ExecutionSetupException {
+  private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator)
+      throws ExecutionSetupException {
     @SuppressWarnings("resource")
-    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, rootClient,
-        drillbitContext.getFunctionImplementationRegistry());
+    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
+        initiatingClient, drillbitContext.getFunctionImplementationRegistry());
     @SuppressWarnings("resource")
     final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
     rootContext.setBuffers(buffers);

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index a5b928b..f526fbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -56,7 +56,7 @@ public class NonRootFragmentManager implements FragmentManager {
     try {
       this.fragment = fragment;
       this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
-      this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
+      this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry());
       this.buffers = new IncomingBuffers(root, this.context);
       final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
           fragment.getForeman()));

http://git-wip-us.apache.org/repos/asf/drill/blob/117b7497/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
index ab6639e..550f56f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
@@ -74,7 +74,7 @@ public class TestDrillFileSystem {
     stats.startProcessing();
 
     try {
-      dfs = new DrillFileSystem(FileSystem.get(conf), stats);
+      dfs = new DrillFileSystem(conf, stats);
       is = dfs.open(new Path(tempFilePath));
 
       byte[] buf = new byte[8000];


Mime
View raw message