drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [1/5] drill git commit: DRILL-4956: Temporary tables support
Date Tue, 24 Jan 2017 06:15:21 GMT
Repository: drill
Updated Branches:
  refs/heads/master 8a4d7a994 -> 2af709f43


http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
new file mode 100644
index 0000000..a125bae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Contains list of parameters that will be used to store path / files on file system. */
+public class StorageStrategy {
+
+  /**
+   * Primary is used for persistent tables.
+   * For directories: drwxrwxr-x (owner and group have full access, others can read and execute).
+   * For files: -rw-r--r-- (owner can read and write, group and others can read).
+   * Folders and files are not deleted on file system close.
+   */
+  public static final StorageStrategy PERSISTENT = new StorageStrategy("775", "644", false);
+
+  /**
+   * Primary is used for temporary tables.
+   * For directories: drwx------ (owner has full access, group and others have no access).
+   * For files: -rw------- (owner can read and write, group and others have no access).
+   * Folders and files are deleted on file system close.
+   */
+  public static final StorageStrategy TEMPORARY = new StorageStrategy("700", "600", true);
+
+  private final String folderPermission;
+  private final String filePermission;
+  private final boolean deleteOnExit;
+
+  @JsonCreator
+  public StorageStrategy(@JsonProperty("folderPermission") String folderPermission,
+                         @JsonProperty("filePermission") String filePermission,
+                         @JsonProperty("deleteOnExit") boolean deleteOnExit) {
+    this.folderPermission = folderPermission;
+    this.filePermission = filePermission;
+    this.deleteOnExit = deleteOnExit;
+  }
+
+  public String getFolderPermission() {
+    return folderPermission;
+  }
+
+  public String getFilePermission() { return filePermission; }
+
+  public boolean isDeleteOnExit() {
+    return deleteOnExit;
+  }
+
+  /**
+   * Creates passed path on appropriate file system.
+   * Before creation checks which parent directories do not exists.
+   * Applies storage strategy rules to all newly created directories.
+   * Will return first created path or null already existed.
+   *
+   * Case 1: /a/b -> already exists, attempt to create /a/b/c/d
+   * Will create path and return /a/b/c.
+   * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/d
+   * Will create path and return /a/b/c/d.
+   * Case 3: /a/b/c/d -> already exists, will return null.
+   *
+   * @param fs file system where file should be located
+   * @param path location path
+   * @return first created parent path or file
+   * @throws IOException is thrown in case of problems while creating path, setting permission
+   *         or adding path to delete on exit list
+   */
+  public Path createPathAndApply(FileSystem fs, Path path) throws IOException {
+    List<Path> locations = getNonExistentLocations(fs, path);
+    if (locations.isEmpty()) {
+      return null;
+    }
+    fs.mkdirs(path);
+    for (Path location : locations) {
+      applyStrategy(fs, location, folderPermission, deleteOnExit);
+    }
+    return locations.get(locations.size() - 1);
+  }
+
+  /**
+   * Creates passed file on appropriate file system.
+   * Before creation checks which parent directories do not exists.
+   * Applies storage strategy rules to all newly created directories and file.
+   * Will return first created parent path or file if no new parent paths created.
+   *
+   * Case 1: /a/b -> already exists, attempt to create /a/b/c/some_file.txt
+   * Will create file and return /a/b/c.
+   * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/some_file.txt
+   * Will create file and return /a/b/c/some_file.txt.
+   * Case 3: /a/b/c/some_file.txt -> already exists, will fail.
+   *
+   * @param fs file system where file should be located
+   * @param file file path
+   * @return first created parent path or file
+   * @throws IOException is thrown in case of problems while creating path, setting permission
+   *         or adding path to delete on exit list
+   */
+  public Path createFileAndApply(FileSystem fs, Path file) throws IOException {
+    List<Path> locations = getNonExistentLocations(fs, file.getParent());
+    if (!fs.createNewFile(file)) {
+      throw new IOException(String.format("File [%s] already exists on file system [%s].",
+          file.toUri().getPath(), fs.getUri()));
+    }
+    applyToFile(fs, file);
+
+    if (locations.isEmpty()) {
+      return file;
+    }
+
+    for (Path location : locations) {
+      applyStrategy(fs, location, folderPermission, deleteOnExit);
+    }
+    return locations.get(locations.size() - 1);
+  }
+
+  /**
+   * Applies storage strategy to file:
+   * sets permission and adds to file system delete on exit list if needed.
+   *
+   * @param fs file system
+   * @param file path to file
+   * @throws IOException is thrown in case of problems while setting permission
+   *         or adding file to delete on exit list
+   */
+  public void applyToFile(FileSystem fs, Path file) throws IOException {
+    applyStrategy(fs, file, filePermission, deleteOnExit);
+  }
+
+  /**
+   * Returns list of parent locations that do not exist, including initial location.
+   * First in the list will be initial location,
+   * last in the list will be last parent location that does not exist.
+   * If all locations exist, empty list will be returned.
+   *
+   * Case 1: if /a/b exists and passed location is /a/b/c/d,
+   * will return list with two elements: 0 -> /a/b/c/d, 1 -> /a/b/c
+   * Case 2: if /a/b exists and passed location is /a/b, will return empty list.
+   *
+   * @param fs file system where locations should be located
+   * @param path location path
+   * @return list of locations that do not exist
+   * @throws IOException in case of troubles accessing file system
+   */
+  private List<Path> getNonExistentLocations(FileSystem fs, Path path) throws IOException {
+    List<Path> locations = Lists.newArrayList();
+    Path starting = path;
+    while (starting != null && !fs.exists(starting)) {
+      locations.add(starting);
+      starting = starting.getParent();
+    }
+    return locations;
+  }
+
+  /**
+   * Applies storage strategy to passed path on passed file system.
+   * Sets appropriate permission
+   * and adds to file system delete on exit list if needed.
+   *
+   * @param fs file system where path is located
+   * @param path path location
+   * @param permission permission to be applied
+   * @param deleteOnExit if to delete path on exit
+   * @throws IOException is thrown in case of problems while setting permission
+   *         or adding path to delete on exit list
+   */
+  private void applyStrategy(FileSystem fs, Path path, String permission, boolean deleteOnExit) throws IOException {
+    fs.setPermission(path, new FsPermission(permission));
+    if (deleteOnExit) {
+      fs.deleteOnExit(path);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
index e502e99..2110f38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -63,8 +63,8 @@ public class SubSchemaWrapper extends AbstractSchema {
   }
 
   @Override
-  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
-    return innerSchema.createNewTable(tableName, partitionColumns);
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
+    return innerSchema.createNewTable(tableName, partitionColumns, storageStrategy);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index e0f5438..3a89591 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -288,6 +288,9 @@ public class FileSelection {
     }
     final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().toString());
     logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
+    if (fileSel == null) {
+      return null;
+    }
     fileSel.setHadWildcard(hasWildcard);
     return fileSel;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index 526dfb1..e3e01c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,12 +24,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
@@ -150,8 +148,8 @@ public class FileSystemSchemaFactory implements SchemaFactory{
     }
 
     @Override
-    public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
-      return defaultSchema.createNewTable(tableName, partitionColumns);
+    public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
+      return defaultSchema.createNewTable(tableName, partitionColumns, storageStrategy);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index dac313b..8416ed8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -53,13 +53,13 @@ import org.apache.drill.exec.dotdrill.DotDrillFile;
 import org.apache.drill.exec.dotdrill.DotDrillType;
 import org.apache.drill.exec.dotdrill.DotDrillUtil;
 import org.apache.drill.exec.dotdrill.View;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
 import org.apache.drill.exec.planner.logical.DrillViewTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
-import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
@@ -522,7 +522,6 @@ public class WorkspaceSchemaFactory {
       } catch (UnsupportedOperationException e) {
         logger.debug("The filesystem for this workspace does not support this operation.", e);
       }
-
       return tables.get(tableKey);
     }
 
@@ -540,7 +539,7 @@ public class WorkspaceSchemaFactory {
     }
 
     @Override
-    public CreateTableEntry createNewTable(String tableName, List<String> partitonColumns) {
+    public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
       String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
       FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
       if (formatPlugin == null) {
@@ -553,7 +552,8 @@ public class WorkspaceSchemaFactory {
           (FileSystemConfig) plugin.getConfig(),
           formatPlugin,
           config.getLocation() + Path.SEPARATOR + tableName,
-          partitonColumns);
+          partitionColumns,
+          storageStrategy);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index db22568..52ce8b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,11 +21,11 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
@@ -48,6 +48,7 @@ public class EasyWriter extends AbstractWriter {
       @JsonProperty("child") PhysicalOperator child,
       @JsonProperty("location") String location,
       @JsonProperty("partitionColumns") List<String> partitionColumns,
+      @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
       @JsonProperty("storage") StoragePluginConfig storageConfig,
       @JsonProperty("format") FormatPluginConfig formatConfig,
       @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
@@ -57,6 +58,7 @@ public class EasyWriter extends AbstractWriter {
     Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.location = location;
     this.partitionColumns = partitionColumns;
+    setStorageStrategy(storageStrategy);
   }
 
   public EasyWriter(PhysicalOperator child,
@@ -92,7 +94,9 @@ public class EasyWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new EasyWriter(child, location, partitionColumns, formatPlugin);
+    EasyWriter writer = new EasyWriter(child, location, partitionColumns, formatPlugin);
+    writer.setStorageStrategy(getStorageStrategy());
+    return writer;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 30c248e..58ca95f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -83,7 +83,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
     options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
     options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
 
-    RecordWriter recordWriter = new JsonRecordWriter();
+    RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy());
     recordWriter.init(options);
 
     return recordWriter;

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index f27e04c..c37da8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.EventBasedRecordWriter;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
@@ -46,6 +46,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
   private static final String LINE_FEED = String.format("%n");
 
+  private Path cleanUpLocation;
   private String location;
   private String prefix;
 
@@ -58,11 +59,13 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   private FSDataOutputStream stream = null;
 
   private final JsonFactory factory = new JsonFactory();
+  private final StorageStrategy storageStrategy;
 
   // Record write status
   private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
 
-  public JsonRecordWriter(){
+  public JsonRecordWriter(StorageStrategy storageStrategy){
+    this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy;
   }
 
   @Override
@@ -81,7 +84,17 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
 
     Path fileName = new Path(location, prefix + "_" + index + "." + extension);
     try {
+      // json writer does not support partitions, so only one file can be created
+      // and thus only one location should be deleted in case of abort
+      // to ensure that our writer was the first to create output file,
+      // we create empty output file first and fail if file exists
+      cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName);
+
+      // since empty output file will be overwritten (some file systems may restrict append option)
+      // we need to re-apply file permission
       stream = fs.create(fileName);
+      storageStrategy.applyToFile(fs, fileName);
+
       JsonGenerator generator = factory.createGenerator(stream).useDefaultPrettyPrinter();
       if (uglify) {
         generator = generator.setPrettyPrinter(new MinimalPrettyPrinter(LINE_FEED));
@@ -238,6 +251,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
 
   @Override
   public void abort() throws IOException {
+    if (cleanUpLocation != null) {
+      fs.delete(cleanUpLocation, true);
+      logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+          cleanUpLocation.toUri().getPath(), fs.getUri());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 6542ad4..a9a30e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -125,7 +125,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
 
     options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0));
 
-    RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator());
+    RecordWriter recordWriter = new DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy());
     recordWriter.init(options);
 
     return recordWriter;

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 4ee863a..a25699d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,6 +34,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
@@ -80,6 +81,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   public static final String DRILL_VERSION_PROPERTY = "drill.version";
   public static final String WRITER_VERSION_PROPERTY = "drill-writer.version";
 
+  private final StorageStrategy storageStrategy;
   private ParquetFileWriter parquetFileWriter;
   private MessageType schema;
   private Map<String, String> extraMetaData = new HashMap<>();
@@ -101,7 +103,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private BatchSchema batchSchema;
 
   private Configuration conf;
+  private FileSystem fs;
   private String location;
+  private List<Path> cleanUpLocations;
   private String prefix;
   private int index = 0;
   private OperatorContext oContext;
@@ -117,6 +121,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
     this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
     this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
+    this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.PERSISTENT : writer.getStorageStrategy();
+    this.cleanUpLocations = Lists.newArrayList();
   }
 
   @Override
@@ -126,6 +132,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
     conf = new Configuration();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
+    fs = FileSystem.get(conf);
     blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
     pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
     dictionaryPageSize= Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_DICT_PAGE_SIZE));
@@ -363,7 +370,19 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     // we wait until there is at least one record before creating the parquet file
     if (parquetFileWriter == null) {
       Path path = new Path(location, prefix + "_" + index + ".parquet");
-      parquetFileWriter = new ParquetFileWriter(conf, schema, path);
+      // to ensure that our writer was the first to create output file, we create empty file first and fail if file exists
+      Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path);
+
+      // since parquet reader supports partitions, it means that several output files may be created
+      // if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort
+      // if table location was created before, we store only files created by this writer and delete them in case of abort
+      addCleanUpLocation(fs, firstCreatedPath);
+
+      // since ParquetFileWriter will overwrite empty output file (append is not supported)
+      // we need to re-apply file permission
+      parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE);
+      storageStrategy.applyToFile(fs, path);
+
       parquetFileWriter.start();
     }
 
@@ -374,6 +393,24 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   @Override
   public void abort() throws IOException {
+    List<String> errors = Lists.newArrayList();
+    for (Path location : cleanUpLocations) {
+      try {
+        if (fs.exists(location)) {
+          fs.delete(location, false);
+          logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+              location.toUri().getPath(), fs.getUri());
+        }
+      } catch (IOException e) {
+        errors.add(location.toUri().getPath());
+        logger.error("Failed to delete location [{}] on file system [{}].",
+            location, fs.getUri(), e);
+      }
+    }
+    if (!errors.isEmpty()) {
+      throw new IOException(String.format("Failed to delete the following locations %s on file system [%s]" +
+          " during aborting writer", errors, fs.getUri()));
+    }
   }
 
   @Override
@@ -382,4 +419,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
     codecFactory.release();
   }
+
+  /**
+   * Adds passed location to the list of locations to be cleaned up in case of abort.
+   * Add locations if:
+   * <li>if no locations were added before</li>
+   * <li>if first location is a file</li>
+   *
+   * If first added location is a folder, we don't add other locations (which can be only files),
+   * since this writer was the one to create main folder where files are located,
+   * on abort we'll delete this folder with its content.
+   *
+   * If first location is a file, then we add other files, since this writer didn't create main folder
+   * and on abort we need to delete only created files but not the whole folder.
+   *
+   * @param fs file system where location is created
+   * @param location passed location
+   * @throws IOException in case of errors during check if passed location is a file
+   */
+  private void addCleanUpLocation(FileSystem fs, Path location) throws IOException {
+    if (cleanUpLocations.isEmpty() || fs.isFile(cleanUpLocations.get(0))) {
+      cleanUpLocations.add(location);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 716c56d..522c678 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,11 +21,11 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
@@ -61,6 +61,7 @@ public class ParquetWriter extends AbstractWriter {
           @JsonProperty("child") PhysicalOperator child,
           @JsonProperty("location") String location,
           @JsonProperty("partitionColumns") List<String> partitionColumns,
+          @JsonProperty("storageStrategy") StorageStrategy storageStrategy,
           @JsonProperty("storage") StoragePluginConfig storageConfig,
           @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
 
@@ -69,6 +70,7 @@ public class ParquetWriter extends AbstractWriter {
     Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.location = location;
     this.partitionColumns = partitionColumns;
+    setStorageStrategy(storageStrategy);
   }
 
   public ParquetWriter(PhysicalOperator child,
@@ -109,7 +111,9 @@ public class ParquetWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new ParquetWriter(child, location, partitionColumns, formatPlugin);
+    ParquetWriter writer = new ParquetWriter(child, location, partitionColumns, formatPlugin);
+    writer.setStorageStrategy(getStorageStrategy());
+    return writer;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 8a74b49..d65a3eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.StringOutputRecordWriter;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -37,6 +37,10 @@ import com.google.common.base.Joiner;
 public class DrillTextRecordWriter extends StringOutputRecordWriter {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class);
 
+  private final StorageStrategy storageStrategy;
+
+  private Path cleanUpLocation;
+
   private String location;
   private String prefix;
 
@@ -52,8 +56,9 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
   private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
   private StringBuilder currentRecord; // contains the current record separated by field delimiter
 
-  public DrillTextRecordWriter(BufferAllocator allocator) {
+  public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy) {
     super(allocator);
+    this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy;
   }
 
   @Override
@@ -79,7 +84,17 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
     // open a new file for writing data with new schema
     Path fileName = new Path(location, prefix + "_" + index + "." + extension);
     try {
+      // drill text writer does not support partitions, so only one file can be created
+      // and thus only one location should be deleted in case of abort
+      // to ensure that our writer was the first to create output file,
+      // we create empty output file first and fail if file exists
+      cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName);
+
+      // since empty output file will be overwritten (some file systems may restrict append option)
+      // we need to re-apply file permission
       DataOutputStream fos = fs.create(fileName);
+      storageStrategy.applyToFile(fs, fileName);
+
       stream = new PrintStream(fos);
       logger.debug("Created file: {}", fileName);
     } catch (IOException ex) {
@@ -160,12 +175,10 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
 
   @Override
   public void abort() throws IOException {
-    cleanup();
-    try {
-      fs.delete(new Path(location), true);
-    } catch (IOException ex) {
-      logger.error("Abort failed. There could be leftover output files");
-      throw ex;
+    if (cleanUpLocation != null) {
+      fs.delete(cleanUpLocation, true);
+      logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
+          cleanUpLocation.toUri().getPath(), fs.getUri());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 01e4be0..735ba2f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -203,7 +203,7 @@ drill.exec: {
   scan: {
     threadpool_size: 8,
     decode_threadpool_size: 1
-  }
+  },
   udf: {
     retry-attempts: 5,
     directory: {
@@ -227,7 +227,11 @@ drill.exec: {
       registry: ${drill.exec.udf.directory.base}"/registry",
       tmp: ${drill.exec.udf.directory.base}"/tmp"
     }
-  }
+  },
+  # Temporary table can be created ONLY in default temporary workspace.
+  # Full workspace name should be indicated (including schema and workspace separated by dot).
+  # Workspace MUST be file-based and writable. Workspace name is case-sensitive.
+  default_temporary_workspace: "dfs.tmp"
 }
 
 drill.jdbc: {

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 93916e9..fb84088 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -84,6 +84,7 @@ public class BaseTestQuery extends ExecTest {
     {
       put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
       put(ExecConstants.HTTP_ENABLE, "false");
+      put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, TEMP_SCHEMA);
     }
   };
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
index e9a38b0..acbf2e7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -177,7 +177,7 @@ public class TestDropTable extends PlanTestBase {
 
   @Test // DRILL-4673
   public void testDropTableIfExistsWhileTableExists() throws Exception {
-    final String existentTableName = "test_table";
+    final String existentTableName = "test_table_exists";
     test("use dfs_test.tmp");
 
     // successful dropping of existent table
@@ -192,7 +192,7 @@ public class TestDropTable extends PlanTestBase {
 
   @Test // DRILL-4673
   public void testDropTableIfExistsWhileTableDoesNotExist() throws Exception {
-    final String nonExistentTableName = "test_table";
+    final String nonExistentTableName = "test_table_not_exists";
     test("use dfs_test.tmp");
 
     // dropping of non existent table without error
@@ -200,7 +200,7 @@ public class TestDropTable extends PlanTestBase {
         .sqlQuery(String.format(DROP_TABLE_IF_EXISTS, nonExistentTableName))
         .unOrdered()
         .baselineColumns("ok", "summary")
-        .baselineValues(true, String.format("Table [%s] not found", nonExistentTableName))
+        .baselineValues(false, String.format("Table [%s] not found", nonExistentTableName))
         .go();
   }
 
@@ -216,7 +216,7 @@ public class TestDropTable extends PlanTestBase {
           .sqlQuery(String.format(DROP_TABLE_IF_EXISTS, viewName))
           .unOrdered()
           .baselineColumns("ok", "summary")
-          .baselineValues(true, String.format("Table [%s] not found", viewName))
+          .baselineValues(false, String.format("Table [%s] not found", viewName))
           .go();
     } finally {
       test(String.format(DROP_VIEW_IF_EXISTS, viewName));

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
new file mode 100644
index 0000000..f5d45b0
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.integration.junit4.JMockit;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.util.TestUtilities;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.UUID;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(JMockit.class)
+public class TemporaryTablesAutomaticDropTest extends BaseTestQuery {
+
+  private static final String session_id = "sessionId";
+
+  @Before
+  public void init() throws Exception {
+    new MockUp<UUID>() {
+      @Mock
+      public UUID randomUUID() {
+        return UUID.nameUUIDFromBytes(session_id.getBytes());
+      }
+    };
+    updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties()));
+  }
+
+  @Test
+  public void testAutomaticDropWhenClientIsClosed() throws Exception {
+    File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("client_closed",
+            getDfsTestTmpSchemaLocation());
+    updateClient("new_client");
+    assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
+  }
+
+  @Test
+  public void testAutomaticDropWhenDrillbitIsClosed() throws Exception {
+    File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("drillbit_closed",
+            getDfsTestTmpSchemaLocation());
+    bits[0].close();
+    assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
+  }
+
+  @Test
+  public void testAutomaticDropOfSeveralSessionTemporaryLocations() throws Exception {
+    File firstSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("first_location",
+            getDfsTestTmpSchemaLocation());
+    StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    String tempDir = TestUtilities.createTempDir();
+    try {
+      TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, tempDir);
+      File secondSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("second_location", tempDir);
+      updateClient("new_client");
+      assertFalse("First session temporary location should be absent", firstSessionTemporaryLocation.exists());
+      assertFalse("Second session temporary location should be absent", secondSessionTemporaryLocation.exists());
+    } finally {
+      TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, getDfsTestTmpSchemaLocation());
+    }
+  }
+
+  private File createAndCheckSessionTemporaryLocation(String suffix, String schemaLocation) throws Exception {
+    String temporaryTableName = "temporary_table_automatic_drop_" + suffix;
+    test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+    File sessionTemporaryLocation = new File(schemaLocation,
+            UUID.nameUUIDFromBytes(session_id.getBytes()).toString());
+    assertTrue("Session temporary location should exist", sessionTemporaryLocation.exists());
+    return sessionTemporaryLocation;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
index 43d8d57..5bf55af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -143,7 +143,7 @@ public class TestBaseViewSupport extends BaseTestQuery {
           .sqlQuery(String.format("DROP VIEW IF EXISTS %s", viewFullName))
           .unOrdered()
           .baselineColumns("ok", "summary")
-          .baselineValues(true, String.format("View [%s] not found in schema [%s].", viewName, finalSchema))
+          .baselineValues(false, String.format("View [%s] not found in schema [%s].", viewName, finalSchema))
           .go();
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
new file mode 100644
index 0000000..93c8cad
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.sql;
+
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.integration.junit4.JMockit;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.util.TestUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(JMockit.class)
+public class TestCTTAS extends BaseTestQuery {
+
+  private static final UUID session_id = UUID.nameUUIDFromBytes("sessionId".getBytes());
+  private static final String test_schema = "dfs_test";
+  private static final String temp2_wk = "tmp2";
+  private static final String temp2_schema = String.format("%s.%s", test_schema, temp2_wk);
+
+  private static FileSystem fs;
+  private static FsPermission expectedFolderPermission;
+  private static FsPermission expectedFilePermission;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    MockUp<UUID> uuidMockUp = mockRandomUUID(session_id);
+    updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties()));
+    uuidMockUp.tearDown();
+
+    StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(test_schema).getConfig();
+    pluginConfig.workspaces.put(temp2_wk, new WorkspaceConfig(TestUtilities.createTempDir(), true, null));
+    pluginRegistry.createOrUpdate(test_schema, pluginConfig, true);
+
+    fs = FileSystem.get(new Configuration());
+    expectedFolderPermission = new FsPermission(StorageStrategy.TEMPORARY.getFolderPermission());
+    expectedFilePermission = new FsPermission(StorageStrategy.TEMPORARY.getFilePermission());
+  }
+
+  private static MockUp<UUID> mockRandomUUID(final UUID uuid) {
+    return new MockUp<UUID>() {
+      @Mock
+      public UUID randomUUID() {
+        return uuid;
+      }
+    };
+  }
+
+  @Test
+  public void testSyntax() throws Exception {
+    test("create TEMPORARY table temporary_keyword as select 1 from (values(1))");
+    test("create TEMPORARY table temporary_keyword_with_wk as select 1 from (values(1))", TEMP_SCHEMA);
+  }
+
+  @Test
+  public void testCreateTableWithDifferentStorageFormats() throws Exception {
+    List<String> storageFormats = Lists.newArrayList("parquet", "json", "csvh");
+
+    try {
+      for (String storageFormat : storageFormats) {
+        String temporaryTableName = "temp_" + storageFormat;
+        mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes()));
+        test("alter session set `store.format`='%s'", storageFormat);
+        test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+        checkPermission(temporaryTableName);
+
+        testBuilder()
+            .sqlQuery("select * from %s", temporaryTableName)
+            .unOrdered()
+            .baselineColumns("c1")
+            .baselineValues("A")
+            .go();
+
+        testBuilder()
+            .sqlQuery("select * from %s", temporaryTableName)
+            .unOrdered()
+            .sqlBaselineQuery("select * from %s.%s", TEMP_SCHEMA, temporaryTableName)
+            .go();
+      }
+    } finally {
+      test("alter session reset `store.format`");
+    }
+  }
+
+  @Test
+  public void testTemporaryTablesCaseInsensitivity() throws Exception {
+    String temporaryTableName = "tEmP_InSeNSiTiVe";
+    List<String> temporaryTableNames = Lists.newArrayList(
+        temporaryTableName,
+        temporaryTableName.toLowerCase(),
+        temporaryTableName.toUpperCase());
+
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+    for (String tableName : temporaryTableNames) {
+      testBuilder()
+          .sqlQuery("select * from %s", tableName)
+          .unOrdered()
+          .baselineColumns("c1")
+          .baselineValues("A")
+          .go();
+    }
+  }
+
+  @Test
+  public void testPartitionByWithTemporaryTables() throws Exception {
+    String temporaryTableName = "temporary_table_with_partitions";
+    mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes()));
+    test("create TEMPORARY table %s partition by (c1) as select * from (" +
+        "select 'A' as c1 from (values(1)) union all select 'B' as c1 from (values(1))) t", temporaryTableName);
+    checkPermission(temporaryTableName);
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreationOutsideOfDefaultTemporaryWorkspace() throws Exception {
+    try {
+      String temporaryTableName = "temporary_table_outside_of_default_workspace";
+      test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", temp2_schema, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: Temporary tables are not allowed to be created outside of default temporary workspace [%s].",
+          TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenTemporaryTableExistsWithoutSchema() throws Exception {
+    String temporaryTableName = "temporary_table_exists_without_schema";
+    try {
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+         "VALIDATION ERROR: A table or view with given name [%s]" +
+             " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenTemporaryTableExistsCaseInsensitive() throws Exception {
+    String temporaryTableName = "temporary_table_exists_without_schema";
+    try {
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName.toUpperCase());
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", temporaryTableName.toUpperCase(), TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenTemporaryTableExistsWithSchema() throws Exception {
+    String temporaryTableName = "temporary_table_exists_with_schema";
+    try {
+      test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+      test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenPersistentTableExists() throws Exception {
+    String persistentTableName = "persistent_table_exists";
+    try {
+      test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, persistentTableName);
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", persistentTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", persistentTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateWhenViewExists() throws Exception {
+    String viewName = "view_exists";
+    try {
+      test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, viewName);
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", viewName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", viewName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreatePersistentTableWhenTemporaryTableExists() throws Exception {
+    String temporaryTableName = "temporary_table_exists_before_persistent";
+    try {
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+      test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A table or view with given name [%s]" +
+              " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testCreateViewWhenTemporaryTableExists() throws Exception {
+    String temporaryTableName = "temporary_table_exists_before_view";
+    try {
+      test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+      test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: A non-view table with given name [%s] already exists in schema [%s]",
+          temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  @Test
+  public void testTemporaryAndPersistentTablesPriority() throws Exception {
+    String name = "temporary_and_persistent_table";
+    test("use %s", temp2_schema);
+    test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name);
+    test("create table %s as select 'persistent_table' as c1 from (values(1))", name);
+
+    testBuilder()
+        .sqlQuery("select * from %s", name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("temporary_table")
+        .go();
+
+    testBuilder()
+        .sqlQuery("select * from %s.%s", temp2_schema, name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("persistent_table")
+        .go();
+
+    test("drop table %s", name);
+
+    testBuilder()
+        .sqlQuery("select * from %s", name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("persistent_table")
+        .go();
+  }
+
+  @Test
+  public void testTemporaryTableAndViewPriority() throws Exception {
+    String name = "temporary_table_and_view";
+    test("use %s", temp2_schema);
+    test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name);
+    test("create view %s as select 'view' as c1 from (values(1))", name);
+
+    testBuilder()
+        .sqlQuery("select * from %s", name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("temporary_table")
+        .go();
+
+    testBuilder()
+        .sqlQuery("select * from %s.%s", temp2_schema, name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("view")
+        .go();
+
+    test("drop table %s", name);
+
+    testBuilder()
+        .sqlQuery("select * from %s", name)
+        .unOrdered()
+        .baselineColumns("c1")
+        .baselineValues("view")
+        .go();
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testTemporaryTablesInViewDefinitions() throws Exception {
+    String temporaryTableName = "temporary_table_for_view_definition";
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+    try {
+      test("create view %s.view_with_temp_table as select * from %s", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+          "VALIDATION ERROR: Temporary tables usage is disallowed. Used temporary table name: [%s]", temporaryTableName)));
+      throw e;
+    }
+  }
+
+  @Test
+  public void testManualDropWithoutSchema() throws Exception {
+    String temporaryTableName = "temporary_table_to_drop_without_schema";
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+    testBuilder()
+        .sqlQuery("drop table %s", temporaryTableName)
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName))
+        .go();
+  }
+
+  @Test
+  public void testManualDropWithSchema() throws Exception {
+    String temporaryTableName = "temporary_table_to_drop_with_schema";
+    test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
+
+    testBuilder()
+        .sqlQuery("drop table %s.%s", TEMP_SCHEMA, temporaryTableName)
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName))
+        .go();
+  }
+
+  @Test
+  public void testDropTemporaryTableAsViewWithoutException() throws Exception {
+    String temporaryTableName = "temporary_table_to_drop_like_view_without_exception";
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+    testBuilder()
+        .sqlQuery("drop view if exists %s.%s", TEMP_SCHEMA, temporaryTableName)
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(false, String.format("View [%s] not found in schema [%s].",
+            temporaryTableName, TEMP_SCHEMA))
+        .go();
+  }
+
+  @Test(expected = UserRemoteException.class)
+  public void testDropTemporaryTableAsViewWithException() throws Exception {
+    String temporaryTableName = "temporary_table_to_drop_like_view_with_exception";
+    test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName);
+
+    try {
+      test("drop view %s.%s", TEMP_SCHEMA, temporaryTableName);
+    } catch (UserRemoteException e) {
+      assertThat(e.getMessage(), containsString(String.format(
+              "VALIDATION ERROR: Unknown view [%s] in schema [%s]", temporaryTableName, TEMP_SCHEMA)));
+      throw e;
+    }
+  }
+
+  private void checkPermission(String tmpTableName) throws IOException {
+    File[] files = findTemporaryTableLocation(tmpTableName);
+    assertEquals("Only one directory should match temporary table name " + tmpTableName, 1, files.length);
+    Path tmpTablePath = new Path(files[0].toURI().getPath());
+    assertEquals("Directory permission should match",
+        expectedFolderPermission, fs.getFileStatus(tmpTablePath).getPermission());
+    RemoteIterator<LocatedFileStatus> fileIterator = fs.listFiles(tmpTablePath, false);
+    while (fileIterator.hasNext()) {
+      assertEquals("File permission should match", expectedFilePermission, fileIterator.next().getPermission());
+    }
+  }
+
+  private File[] findTemporaryTableLocation(String tableName) throws IOException {
+    File sessionTempLocation = new File(getDfsTestTmpSchemaLocation(), session_id.toString());
+    Path sessionTempLocationPath = new Path(sessionTempLocation.toURI().getPath());
+    assertTrue("Session temporary location must exist", fs.exists(sessionTempLocationPath));
+    assertEquals("Session temporary location permission should match",
+        expectedFolderPermission, fs.getFileStatus(sessionTempLocationPath).getPermission());
+    final String tableUUID =  UUID.nameUUIDFromBytes(tableName.getBytes()).toString();
+    return sessionTempLocation.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File path) {
+        return path.isDirectory() && path.getName().equals(tableUUID);
+      }
+    });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
new file mode 100644
index 0000000..6a377ec
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StorageStrategyTest {
+
+  private static final Configuration configuration = new Configuration();
+  private static final FsPermission full_permission = new FsPermission("777");
+  private static final StorageStrategy persistent_strategy = new StorageStrategy("775", "644", false);
+  private static final StorageStrategy temporary_strategy = new StorageStrategy("700", "600", true);
+  private FileSystem fs;
+
+  @Before
+  public void setup() throws Exception {
+    initFileSystem();
+  }
+
+  @Test
+  public void testPermissionAndDeleteOnExitFalseForFileWithParent() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 2, true);
+    Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+    Path createdParentPath = persistent_strategy.createFileAndApply(fs, file);
+
+    assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+    checkPathAndPermission(initialPath, file, true, 2, persistent_strategy);
+    checkDeleteOnExit(firstCreatedParentPath, true);
+  }
+
+  @Test
+  public void testPermissionAndDeleteOnExitTrueForFileWithParent() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 2, true);
+    Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+    Path createdParentPath = temporary_strategy.createFileAndApply(fs, file);
+
+    assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+    checkPathAndPermission(initialPath, file, true, 2, temporary_strategy);
+    checkDeleteOnExit(firstCreatedParentPath, false);
+  }
+
+  @Test
+  public void testPermissionAndDeleteOnExitFalseForFileOnly() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 0, true);
+
+    Path createdFile = persistent_strategy.createFileAndApply(fs, file);
+
+    assertEquals("Path should match", file, createdFile);
+    checkPathAndPermission(initialPath, file, true, 0, persistent_strategy);
+    checkDeleteOnExit(file, true);
+  }
+
+  @Test
+  public void testPermissionAndDeleteOnExitTrueForFileOnly() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 0, true);
+
+    Path createdFile = temporary_strategy.createFileAndApply(fs, file);
+
+    assertEquals("Path should match", file, createdFile);
+    checkPathAndPermission(initialPath, file, true, 0, temporary_strategy);
+    checkDeleteOnExit(file, false);
+  }
+
+  @Test(expected = IOException.class)
+  public void testFailureOnExistentFile() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 0, true);
+    fs.createNewFile(file);
+    assertTrue("File should exist", fs.exists(file));
+    try {
+      persistent_strategy.createFileAndApply(fs, file);
+    } catch (IOException e) {
+      assertEquals("Error message should match", String.format("File [%s] already exists on file system [%s].",
+          file.toUri().getPath(), fs.getUri()), e.getMessage());
+      throw e;
+    }
+  }
+
+  @Test
+  public void testCreatePathAndDeleteOnExitFalse() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path resultPath = addNLevelsAndFile(initialPath, 2, false);
+    Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+    Path createdParentPath = persistent_strategy.createPathAndApply(fs, resultPath);
+
+    assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+    checkPathAndPermission(initialPath, resultPath, false, 2, persistent_strategy);
+    checkDeleteOnExit(firstCreatedParentPath, true);
+  }
+
+  @Test
+  public void testCreatePathAndDeleteOnExitTrue() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path resultPath = addNLevelsAndFile(initialPath, 2, false);
+    Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false);
+
+    Path createdParentPath = temporary_strategy.createPathAndApply(fs, resultPath);
+
+    assertEquals("Path should match", firstCreatedParentPath, createdParentPath);
+    checkPathAndPermission(initialPath, resultPath, false, 2, temporary_strategy);
+    checkDeleteOnExit(firstCreatedParentPath, false);
+  }
+
+  @Test
+  public void testCreateNoPath() throws Exception {
+    Path path = prepareStorageDirectory();
+
+    Path createdParentPath = temporary_strategy.createPathAndApply(fs, path);
+
+    assertNull("Path should be null", createdParentPath);
+    assertEquals("Permission should match", full_permission, fs.getFileStatus(path).getPermission());
+  }
+
+  @Test
+  public void testStrategyForExistingFile() throws Exception {
+    Path initialPath = prepareStorageDirectory();
+    Path file = addNLevelsAndFile(initialPath, 0, true);
+    fs.createNewFile(file);
+    fs.setPermission(file, full_permission);
+
+    assertTrue("File should exist", fs.exists(file));
+    assertEquals("Permission should match", full_permission, fs.getFileStatus(file).getPermission());
+
+    temporary_strategy.applyToFile(fs, file);
+
+    assertEquals("Permission should match", new FsPermission(temporary_strategy.getFilePermission()),
+        fs.getFileStatus(file).getPermission());
+    checkDeleteOnExit(file, false);
+  }
+
+  private Path prepareStorageDirectory() throws IOException {
+    File storageDirectory = Files.createTempDir();
+    storageDirectory.deleteOnExit();
+    Path path = new Path(storageDirectory.toURI().getPath());
+    fs.setPermission(path, full_permission);
+    return path;
+  }
+
+  private void initFileSystem() throws IOException {
+    if (fs != null) {
+      try {
+        fs.close();
+      } catch (Exception e) {
+        // do nothing
+      }
+    }
+    fs = FileSystem.get(configuration);
+  }
+
+  private Path addNLevelsAndFile(Path initialPath, int levels, boolean addFile) {
+    Path resultPath = initialPath;
+    for (int i = 1; i <= levels; i++) {
+      resultPath = new Path(resultPath, "level" + i);
+    }
+    if (addFile) {
+      resultPath = new Path(resultPath, "test_file.txt");
+    }
+    return resultPath;
+  }
+
+  private void checkPathAndPermission(Path initialPath,
+                                      Path resultPath,
+                                      boolean isFile,
+                                      int levels,
+                                      StorageStrategy storageStrategy) throws IOException {
+
+    assertEquals("Path type should match", isFile, fs.isFile(resultPath));
+    assertEquals("Permission should match", full_permission, fs.getFileStatus(initialPath).getPermission());
+
+    if (isFile) {
+      assertEquals("Permission should match", new FsPermission(storageStrategy.getFilePermission()),
+          fs.getFileStatus(resultPath).getPermission());
+    }
+    Path startingPath = initialPath;
+    FsPermission folderPermission = new FsPermission(storageStrategy.getFolderPermission());
+    for (int i = 1; i <= levels; i++) {
+      startingPath = new Path(startingPath, "level" + i);
+      assertEquals("Permission should match", folderPermission, fs.getFileStatus(startingPath).getPermission());
+    }
+  }
+
+  private void checkDeleteOnExit(Path path, boolean isPresent) throws IOException {
+    assertTrue("Path should be present", fs.exists(path));
+    // close and open file system to check for path presence
+    initFileSystem();
+    assertEquals("Path existence flag should match", isPresent, fs.exists(path));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index 7b977e2..35ca26b 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -9,7 +9,7 @@
           writable: false
         },
         "tmp" : {
-          location: "/tmp/drilltest",
+          location: "/tmp",
           writable: true
         }
       },

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index fbacd23..cd97ab7 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -176,6 +176,13 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   }
 
   /**
+   * Closes all resources connected with current session.
+   * By default has no implementation.
+   */
+  public void closeSession() {
+  }
+
+  /**
    * Connection consumer wants to close connection. Initiate connection close
    * and complete. This is a blocking call that ensures that the connection is
    * closed before returning. As part of this call, the channel close handler

http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index c360e51..cdb9c07 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -164,7 +164,11 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
       }
 
       final ChannelClosedException ex = future.cause() != null ? new ChannelClosedException(msg, future.cause()) : new ChannelClosedException(msg);
-      clientConnection.channelClosed(ex);
+      try {
+        clientConnection.closeSession();
+      } finally {
+        clientConnection.channelClosed(ex);
+      }
     }
 
   }


Mime
View raw message