Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8027F200C29 for ; Tue, 24 Jan 2017 07:15:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7D8FD160B49; Tue, 24 Jan 2017 06:15:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 608EC160B53 for ; Tue, 24 Jan 2017 07:15:23 +0100 (CET) Received: (qmail 89092 invoked by uid 500); 24 Jan 2017 06:15:22 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 89039 invoked by uid 99); 24 Jan 2017 06:15:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2017 06:15:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 856FADFC9D; Tue, 24 Jan 2017 06:15:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jni@apache.org To: commits@drill.apache.org Date: Tue, 24 Jan 2017 06:15:21 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] drill git commit: DRILL-4956: Temporary tables support archived-at: Tue, 24 Jan 2017 06:15:25 -0000 Repository: drill Updated Branches: refs/heads/master 8a4d7a994 -> 2af709f43 http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java new file mode 100644 index 0000000..a125bae --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; + +import java.io.IOException; +import java.util.List; + +/** Contains list of parameters that will be used to store path / files on file system. */ +public class StorageStrategy { + + /** + * Primary is used for persistent tables. + * For directories: drwxrwxr-x (owner and group have full access, others can read and execute). + * For files: -rw-r--r-- (owner can read and write, group and others can read). + * Folders and files are not deleted on file system close. + */ + public static final StorageStrategy PERSISTENT = new StorageStrategy("775", "644", false); + + /** + * Primary is used for temporary tables. + * For directories: drwx------ (owner has full access, group and others have no access). + * For files: -rw------- (owner can read and write, group and others have no access). + * Folders and files are deleted on file system close. + */ + public static final StorageStrategy TEMPORARY = new StorageStrategy("700", "600", true); + + private final String folderPermission; + private final String filePermission; + private final boolean deleteOnExit; + + @JsonCreator + public StorageStrategy(@JsonProperty("folderPermission") String folderPermission, + @JsonProperty("filePermission") String filePermission, + @JsonProperty("deleteOnExit") boolean deleteOnExit) { + this.folderPermission = folderPermission; + this.filePermission = filePermission; + this.deleteOnExit = deleteOnExit; + } + + public String getFolderPermission() { + return folderPermission; + } + + public String getFilePermission() { return filePermission; } + + public boolean isDeleteOnExit() { + return deleteOnExit; + } + + /** + * Creates passed path on appropriate file system. + * Before creation checks which parent directories do not exists. + * Applies storage strategy rules to all newly created directories. + * Will return first created path or null already existed. + * + * Case 1: /a/b -> already exists, attempt to create /a/b/c/d + * Will create path and return /a/b/c. + * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/d + * Will create path and return /a/b/c/d. + * Case 3: /a/b/c/d -> already exists, will return null. + * + * @param fs file system where file should be located + * @param path location path + * @return first created parent path or file + * @throws IOException is thrown in case of problems while creating path, setting permission + * or adding path to delete on exit list + */ + public Path createPathAndApply(FileSystem fs, Path path) throws IOException { + List locations = getNonExistentLocations(fs, path); + if (locations.isEmpty()) { + return null; + } + fs.mkdirs(path); + for (Path location : locations) { + applyStrategy(fs, location, folderPermission, deleteOnExit); + } + return locations.get(locations.size() - 1); + } + + /** + * Creates passed file on appropriate file system. + * Before creation checks which parent directories do not exists. + * Applies storage strategy rules to all newly created directories and file. + * Will return first created parent path or file if no new parent paths created. + * + * Case 1: /a/b -> already exists, attempt to create /a/b/c/some_file.txt + * Will create file and return /a/b/c. + * Case 2: /a/b/c -> already exists, attempt to create /a/b/c/some_file.txt + * Will create file and return /a/b/c/some_file.txt. + * Case 3: /a/b/c/some_file.txt -> already exists, will fail. + * + * @param fs file system where file should be located + * @param file file path + * @return first created parent path or file + * @throws IOException is thrown in case of problems while creating path, setting permission + * or adding path to delete on exit list + */ + public Path createFileAndApply(FileSystem fs, Path file) throws IOException { + List locations = getNonExistentLocations(fs, file.getParent()); + if (!fs.createNewFile(file)) { + throw new IOException(String.format("File [%s] already exists on file system [%s].", + file.toUri().getPath(), fs.getUri())); + } + applyToFile(fs, file); + + if (locations.isEmpty()) { + return file; + } + + for (Path location : locations) { + applyStrategy(fs, location, folderPermission, deleteOnExit); + } + return locations.get(locations.size() - 1); + } + + /** + * Applies storage strategy to file: + * sets permission and adds to file system delete on exit list if needed. + * + * @param fs file system + * @param file path to file + * @throws IOException is thrown in case of problems while setting permission + * or adding file to delete on exit list + */ + public void applyToFile(FileSystem fs, Path file) throws IOException { + applyStrategy(fs, file, filePermission, deleteOnExit); + } + + /** + * Returns list of parent locations that do not exist, including initial location. + * First in the list will be initial location, + * last in the list will be last parent location that does not exist. + * If all locations exist, empty list will be returned. + * + * Case 1: if /a/b exists and passed location is /a/b/c/d, + * will return list with two elements: 0 -> /a/b/c/d, 1 -> /a/b/c + * Case 2: if /a/b exists and passed location is /a/b, will return empty list. + * + * @param fs file system where locations should be located + * @param path location path + * @return list of locations that do not exist + * @throws IOException in case of troubles accessing file system + */ + private List getNonExistentLocations(FileSystem fs, Path path) throws IOException { + List locations = Lists.newArrayList(); + Path starting = path; + while (starting != null && !fs.exists(starting)) { + locations.add(starting); + starting = starting.getParent(); + } + return locations; + } + + /** + * Applies storage strategy to passed path on passed file system. + * Sets appropriate permission + * and adds to file system delete on exit list if needed. + * + * @param fs file system where path is located + * @param path path location + * @param permission permission to be applied + * @param deleteOnExit if to delete path on exit + * @throws IOException is thrown in case of problems while setting permission + * or adding path to delete on exit list + */ + private void applyStrategy(FileSystem fs, Path path, String permission, boolean deleteOnExit) throws IOException { + fs.setPermission(path, new FsPermission(permission)); + if (deleteOnExit) { + fs.deleteOnExit(path); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java index e502e99..2110f38 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -63,8 +63,8 @@ public class SubSchemaWrapper extends AbstractSchema { } @Override - public CreateTableEntry createNewTable(String tableName, List partitionColumns) { - return innerSchema.createNewTable(tableName, partitionColumns); + public CreateTableEntry createNewTable(String tableName, List partitionColumns, StorageStrategy storageStrategy) { + return innerSchema.createNewTable(tableName, partitionColumns, storageStrategy); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java index e0f5438..3a89591 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -288,6 +288,9 @@ public class FileSelection { } final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().toString()); logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); + if (fileSel == null) { + return null; + } fileSel.setHadWildcard(hasWildcard); return fileSel; http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java index 526dfb1..e3e01c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,12 +24,10 @@ import java.util.Map; import java.util.Set; import org.apache.calcite.schema.Function; -import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.planner.logical.CreateTableEntry; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.PartitionNotFoundException; @@ -150,8 +148,8 @@ public class FileSystemSchemaFactory implements SchemaFactory{ } @Override - public CreateTableEntry createNewTable(String tableName, List partitionColumns) { - return defaultSchema.createNewTable(tableName, partitionColumns); + public CreateTableEntry createNewTable(String tableName, List partitionColumns, StorageStrategy storageStrategy) { + return defaultSchema.createNewTable(tableName, partitionColumns, storageStrategy); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java index dac313b..8416ed8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -53,13 +53,13 @@ import org.apache.drill.exec.dotdrill.DotDrillFile; import org.apache.drill.exec.dotdrill.DotDrillType; import org.apache.drill.exec.dotdrill.DotDrillUtil; import org.apache.drill.exec.dotdrill.View; +import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.planner.logical.CreateTableEntry; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DrillTranslatableTable; import org.apache.drill.exec.planner.logical.DrillViewTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry; -import org.apache.drill.exec.planner.sql.DrillOperatorTable; import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.PartitionNotFoundException; @@ -522,7 +522,6 @@ public class WorkspaceSchemaFactory { } catch (UnsupportedOperationException e) { logger.debug("The filesystem for this workspace does not support this operation.", e); } - return tables.get(tableKey); } @@ -540,7 +539,7 @@ public class WorkspaceSchemaFactory { } @Override - public CreateTableEntry createNewTable(String tableName, List partitonColumns) { + public CreateTableEntry createNewTable(String tableName, List partitionColumns, StorageStrategy storageStrategy) { String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val; FormatPlugin formatPlugin = plugin.getFormatPlugin(storage); if (formatPlugin == null) { @@ -553,7 +552,8 @@ public class WorkspaceSchemaFactory { (FileSystemConfig) plugin.getConfig(), formatPlugin, config.getLocation() + Path.SEPARATOR + tableName, - partitonColumns); + partitionColumns, + storageStrategy); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java index db22568..52ce8b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,11 +21,11 @@ import java.io.IOException; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.store.StoragePluginRegistry; import com.fasterxml.jackson.annotation.JacksonInject; @@ -48,6 +48,7 @@ public class EasyWriter extends AbstractWriter { @JsonProperty("child") PhysicalOperator child, @JsonProperty("location") String location, @JsonProperty("partitionColumns") List partitionColumns, + @JsonProperty("storageStrategy") StorageStrategy storageStrategy, @JsonProperty("storage") StoragePluginConfig storageConfig, @JsonProperty("format") FormatPluginConfig formatConfig, @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException { @@ -57,6 +58,7 @@ public class EasyWriter extends AbstractWriter { Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); this.location = location; this.partitionColumns = partitionColumns; + setStorageStrategy(storageStrategy); } public EasyWriter(PhysicalOperator child, @@ -92,7 +94,9 @@ public class EasyWriter extends AbstractWriter { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new EasyWriter(child, location, partitionColumns, formatPlugin); + EasyWriter writer = new EasyWriter(child, location, partitionColumns, formatPlugin); + writer.setStorageStrategy(getStorageStrategy()); + return writer; } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 30c248e..58ca95f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -83,7 +83,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin { options.put("uglify", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY))); options.put("skipnulls", Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS))); - RecordWriter recordWriter = new JsonRecordWriter(); + RecordWriter recordWriter = new JsonRecordWriter(writer.getStorageStrategy()); recordWriter.init(options); return recordWriter; http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java index f27e04c..c37da8a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,7 @@ import java.util.List; import java.util.Map; import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; -import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.store.EventBasedRecordWriter; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; @@ -46,6 +46,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class); private static final String LINE_FEED = String.format("%n"); + private Path cleanUpLocation; private String location; private String prefix; @@ -58,11 +59,13 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr private FSDataOutputStream stream = null; private final JsonFactory factory = new JsonFactory(); + private final StorageStrategy storageStrategy; // Record write status private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called - public JsonRecordWriter(){ + public JsonRecordWriter(StorageStrategy storageStrategy){ + this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy; } @Override @@ -81,7 +84,17 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr Path fileName = new Path(location, prefix + "_" + index + "." + extension); try { + // json writer does not support partitions, so only one file can be created + // and thus only one location should be deleted in case of abort + // to ensure that our writer was the first to create output file, + // we create empty output file first and fail if file exists + cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName); + + // since empty output file will be overwritten (some file systems may restrict append option) + // we need to re-apply file permission stream = fs.create(fileName); + storageStrategy.applyToFile(fs, fileName); + JsonGenerator generator = factory.createGenerator(stream).useDefaultPrettyPrinter(); if (uglify) { generator = generator.setPrettyPrinter(new MinimalPrettyPrinter(LINE_FEED)); @@ -238,6 +251,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr @Override public void abort() throws IOException { + if (cleanUpLocation != null) { + fs.delete(cleanUpLocation, true); + logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.", + cleanUpLocation.toUri().getPath(), fs.getUri()); + } } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 6542ad4..a9a30e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -125,7 +125,7 @@ public class TextFormatPlugin extends EasyFormatPlugin extraMetaData = new HashMap<>(); @@ -101,7 +103,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { private BatchSchema batchSchema; private Configuration conf; + private FileSystem fs; private String location; + private List cleanUpLocations; private String prefix; private int index = 0; private OperatorContext oContext; @@ -117,6 +121,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0; this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion()); this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION)); + this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.PERSISTENT : writer.getStorageStrategy(); + this.cleanUpLocations = Lists.newArrayList(); } @Override @@ -126,6 +132,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { conf = new Configuration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY)); + fs = FileSystem.get(conf); blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE)); pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE)); dictionaryPageSize= Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_DICT_PAGE_SIZE)); @@ -363,7 +370,19 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { // we wait until there is at least one record before creating the parquet file if (parquetFileWriter == null) { Path path = new Path(location, prefix + "_" + index + ".parquet"); - parquetFileWriter = new ParquetFileWriter(conf, schema, path); + // to ensure that our writer was the first to create output file, we create empty file first and fail if file exists + Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path); + + // since parquet reader supports partitions, it means that several output files may be created + // if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort + // if table location was created before, we store only files created by this writer and delete them in case of abort + addCleanUpLocation(fs, firstCreatedPath); + + // since ParquetFileWriter will overwrite empty output file (append is not supported) + // we need to re-apply file permission + parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE); + storageStrategy.applyToFile(fs, path); + parquetFileWriter.start(); } @@ -374,6 +393,24 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { @Override public void abort() throws IOException { + List errors = Lists.newArrayList(); + for (Path location : cleanUpLocations) { + try { + if (fs.exists(location)) { + fs.delete(location, false); + logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.", + location.toUri().getPath(), fs.getUri()); + } + } catch (IOException e) { + errors.add(location.toUri().getPath()); + logger.error("Failed to delete location [{}] on file system [{}].", + location, fs.getUri(), e); + } + } + if (!errors.isEmpty()) { + throw new IOException(String.format("Failed to delete the following locations %s on file system [%s]" + + " during aborting writer", errors, fs.getUri())); + } } @Override @@ -382,4 +419,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { codecFactory.release(); } + + /** + * Adds passed location to the list of locations to be cleaned up in case of abort. + * Add locations if: + *

  • if no locations were added before
  • + *
  • if first location is a file
  • + * + * If first added location is a folder, we don't add other locations (which can be only files), + * since this writer was the one to create main folder where files are located, + * on abort we'll delete this folder with its content. + * + * If first location is a file, then we add other files, since this writer didn't create main folder + * and on abort we need to delete only created files but not the whole folder. + * + * @param fs file system where location is created + * @param location passed location + * @throws IOException in case of errors during check if passed location is a file + */ + private void addCleanUpLocation(FileSystem fs, Path location) throws IOException { + if (cleanUpLocations.isEmpty() || fs.isFile(cleanUpLocations.get(0))) { + cleanUpLocations.add(location); + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java index 716c56d..522c678 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,11 +21,11 @@ import java.io.IOException; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.store.StoragePluginRegistry; @@ -61,6 +61,7 @@ public class ParquetWriter extends AbstractWriter { @JsonProperty("child") PhysicalOperator child, @JsonProperty("location") String location, @JsonProperty("partitionColumns") List partitionColumns, + @JsonProperty("storageStrategy") StorageStrategy storageStrategy, @JsonProperty("storage") StoragePluginConfig storageConfig, @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException { @@ -69,6 +70,7 @@ public class ParquetWriter extends AbstractWriter { Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); this.location = location; this.partitionColumns = partitionColumns; + setStorageStrategy(storageStrategy); } public ParquetWriter(PhysicalOperator child, @@ -109,7 +111,9 @@ public class ParquetWriter extends AbstractWriter { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new ParquetWriter(child, location, partitionColumns, formatPlugin); + ParquetWriter writer = new ParquetWriter(child, location, partitionColumns, formatPlugin); + writer.setStorageStrategy(getStorageStrategy()); + return writer; } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java index 8a74b49..d65a3eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.StringOutputRecordWriter; import org.apache.drill.exec.vector.complex.reader.FieldReader; @@ -37,6 +37,10 @@ import com.google.common.base.Joiner; public class DrillTextRecordWriter extends StringOutputRecordWriter { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class); + private final StorageStrategy storageStrategy; + + private Path cleanUpLocation; + private String location; private String prefix; @@ -52,8 +56,9 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter { private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called private StringBuilder currentRecord; // contains the current record separated by field delimiter - public DrillTextRecordWriter(BufferAllocator allocator) { + public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy) { super(allocator); + this.storageStrategy = storageStrategy == null ? StorageStrategy.PERSISTENT : storageStrategy; } @Override @@ -79,7 +84,17 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter { // open a new file for writing data with new schema Path fileName = new Path(location, prefix + "_" + index + "." + extension); try { + // drill text writer does not support partitions, so only one file can be created + // and thus only one location should be deleted in case of abort + // to ensure that our writer was the first to create output file, + // we create empty output file first and fail if file exists + cleanUpLocation = storageStrategy.createFileAndApply(fs, fileName); + + // since empty output file will be overwritten (some file systems may restrict append option) + // we need to re-apply file permission DataOutputStream fos = fs.create(fileName); + storageStrategy.applyToFile(fs, fileName); + stream = new PrintStream(fos); logger.debug("Created file: {}", fileName); } catch (IOException ex) { @@ -160,12 +175,10 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter { @Override public void abort() throws IOException { - cleanup(); - try { - fs.delete(new Path(location), true); - } catch (IOException ex) { - logger.error("Abort failed. There could be leftover output files"); - throw ex; + if (cleanUpLocation != null) { + fs.delete(cleanUpLocation, true); + logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.", + cleanUpLocation.toUri().getPath(), fs.getUri()); } } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 01e4be0..735ba2f 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -203,7 +203,7 @@ drill.exec: { scan: { threadpool_size: 8, decode_threadpool_size: 1 - } + }, udf: { retry-attempts: 5, directory: { @@ -227,7 +227,11 @@ drill.exec: { registry: ${drill.exec.udf.directory.base}"/registry", tmp: ${drill.exec.udf.directory.base}"/tmp" } - } + }, + # Temporary table can be created ONLY in default temporary workspace. + # Full workspace name should be indicated (including schema and workspace separated by dot). + # Workspace MUST be file-based and writable. Workspace name is case-sensitive. + default_temporary_workspace: "dfs.tmp" } drill.jdbc: { http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index 93916e9..fb84088 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -84,6 +84,7 @@ public class BaseTestQuery extends ExecTest { { put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false"); put(ExecConstants.HTTP_ENABLE, "false"); + put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, TEMP_SCHEMA); } }; http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java index e9a38b0..acbf2e7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestDropTable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -177,7 +177,7 @@ public class TestDropTable extends PlanTestBase { @Test // DRILL-4673 public void testDropTableIfExistsWhileTableExists() throws Exception { - final String existentTableName = "test_table"; + final String existentTableName = "test_table_exists"; test("use dfs_test.tmp"); // successful dropping of existent table @@ -192,7 +192,7 @@ public class TestDropTable extends PlanTestBase { @Test // DRILL-4673 public void testDropTableIfExistsWhileTableDoesNotExist() throws Exception { - final String nonExistentTableName = "test_table"; + final String nonExistentTableName = "test_table_not_exists"; test("use dfs_test.tmp"); // dropping of non existent table without error @@ -200,7 +200,7 @@ public class TestDropTable extends PlanTestBase { .sqlQuery(String.format(DROP_TABLE_IF_EXISTS, nonExistentTableName)) .unOrdered() .baselineColumns("ok", "summary") - .baselineValues(true, String.format("Table [%s] not found", nonExistentTableName)) + .baselineValues(false, String.format("Table [%s] not found", nonExistentTableName)) .go(); } @@ -216,7 +216,7 @@ public class TestDropTable extends PlanTestBase { .sqlQuery(String.format(DROP_TABLE_IF_EXISTS, viewName)) .unOrdered() .baselineColumns("ok", "summary") - .baselineValues(true, String.format("Table [%s] not found", viewName)) + .baselineValues(false, String.format("Table [%s] not found", viewName)) .go(); } finally { test(String.format(DROP_VIEW_IF_EXISTS, viewName)); http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java new file mode 100644 index 0000000..f5d45b0 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/TemporaryTablesAutomaticDropTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.user; + +import mockit.Mock; +import mockit.MockUp; +import mockit.integration.junit4.JMockit; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.util.TestUtilities; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.UUID; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(JMockit.class) +public class TemporaryTablesAutomaticDropTest extends BaseTestQuery { + + private static final String session_id = "sessionId"; + + @Before + public void init() throws Exception { + new MockUp() { + @Mock + public UUID randomUUID() { + return UUID.nameUUIDFromBytes(session_id.getBytes()); + } + }; + updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties())); + } + + @Test + public void testAutomaticDropWhenClientIsClosed() throws Exception { + File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("client_closed", + getDfsTestTmpSchemaLocation()); + updateClient("new_client"); + assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists()); + } + + @Test + public void testAutomaticDropWhenDrillbitIsClosed() throws Exception { + File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("drillbit_closed", + getDfsTestTmpSchemaLocation()); + bits[0].close(); + assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists()); + } + + @Test + public void testAutomaticDropOfSeveralSessionTemporaryLocations() throws Exception { + File firstSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("first_location", + getDfsTestTmpSchemaLocation()); + StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage(); + String tempDir = TestUtilities.createTempDir(); + try { + TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, tempDir); + File secondSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("second_location", tempDir); + updateClient("new_client"); + assertFalse("First session temporary location should be absent", firstSessionTemporaryLocation.exists()); + assertFalse("Second session temporary location should be absent", secondSessionTemporaryLocation.exists()); + } finally { + TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, getDfsTestTmpSchemaLocation()); + } + } + + private File createAndCheckSessionTemporaryLocation(String suffix, String schemaLocation) throws Exception { + String temporaryTableName = "temporary_table_automatic_drop_" + suffix; + test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName); + File sessionTemporaryLocation = new File(schemaLocation, + UUID.nameUUIDFromBytes(session_id.getBytes()).toString()); + assertTrue("Session temporary location should exist", sessionTemporaryLocation.exists()); + return sessionTemporaryLocation; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java index 43d8d57..5bf55af 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestBaseViewSupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -143,7 +143,7 @@ public class TestBaseViewSupport extends BaseTestQuery { .sqlQuery(String.format("DROP VIEW IF EXISTS %s", viewFullName)) .unOrdered() .baselineColumns("ok", "summary") - .baselineValues(true, String.format("View [%s] not found in schema [%s].", viewName, finalSchema)) + .baselineValues(false, String.format("View [%s] not found in schema [%s].", viewName, finalSchema)) .go(); } } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java new file mode 100644 index 0000000..93c8cad --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.sql; + +import com.google.common.collect.Lists; +import mockit.Mock; +import mockit.MockUp; +import mockit.integration.junit4.JMockit; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.StorageStrategy; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.WorkspaceConfig; +import org.apache.drill.exec.util.TestUtilities; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +@RunWith(JMockit.class) +public class TestCTTAS extends BaseTestQuery { + + private static final UUID session_id = UUID.nameUUIDFromBytes("sessionId".getBytes()); + private static final String test_schema = "dfs_test"; + private static final String temp2_wk = "tmp2"; + private static final String temp2_schema = String.format("%s.%s", test_schema, temp2_wk); + + private static FileSystem fs; + private static FsPermission expectedFolderPermission; + private static FsPermission expectedFilePermission; + + @BeforeClass + public static void init() throws Exception { + MockUp uuidMockUp = mockRandomUUID(session_id); + updateTestCluster(1, DrillConfig.create(cloneDefaultTestConfigProperties())); + uuidMockUp.tearDown(); + + StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage(); + FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(test_schema).getConfig(); + pluginConfig.workspaces.put(temp2_wk, new WorkspaceConfig(TestUtilities.createTempDir(), true, null)); + pluginRegistry.createOrUpdate(test_schema, pluginConfig, true); + + fs = FileSystem.get(new Configuration()); + expectedFolderPermission = new FsPermission(StorageStrategy.TEMPORARY.getFolderPermission()); + expectedFilePermission = new FsPermission(StorageStrategy.TEMPORARY.getFilePermission()); + } + + private static MockUp mockRandomUUID(final UUID uuid) { + return new MockUp() { + @Mock + public UUID randomUUID() { + return uuid; + } + }; + } + + @Test + public void testSyntax() throws Exception { + test("create TEMPORARY table temporary_keyword as select 1 from (values(1))"); + test("create TEMPORARY table temporary_keyword_with_wk as select 1 from (values(1))", TEMP_SCHEMA); + } + + @Test + public void testCreateTableWithDifferentStorageFormats() throws Exception { + List storageFormats = Lists.newArrayList("parquet", "json", "csvh"); + + try { + for (String storageFormat : storageFormats) { + String temporaryTableName = "temp_" + storageFormat; + mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes())); + test("alter session set `store.format`='%s'", storageFormat); + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + checkPermission(temporaryTableName); + + testBuilder() + .sqlQuery("select * from %s", temporaryTableName) + .unOrdered() + .baselineColumns("c1") + .baselineValues("A") + .go(); + + testBuilder() + .sqlQuery("select * from %s", temporaryTableName) + .unOrdered() + .sqlBaselineQuery("select * from %s.%s", TEMP_SCHEMA, temporaryTableName) + .go(); + } + } finally { + test("alter session reset `store.format`"); + } + } + + @Test + public void testTemporaryTablesCaseInsensitivity() throws Exception { + String temporaryTableName = "tEmP_InSeNSiTiVe"; + List temporaryTableNames = Lists.newArrayList( + temporaryTableName, + temporaryTableName.toLowerCase(), + temporaryTableName.toUpperCase()); + + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + for (String tableName : temporaryTableNames) { + testBuilder() + .sqlQuery("select * from %s", tableName) + .unOrdered() + .baselineColumns("c1") + .baselineValues("A") + .go(); + } + } + + @Test + public void testPartitionByWithTemporaryTables() throws Exception { + String temporaryTableName = "temporary_table_with_partitions"; + mockRandomUUID(UUID.nameUUIDFromBytes(temporaryTableName.getBytes())); + test("create TEMPORARY table %s partition by (c1) as select * from (" + + "select 'A' as c1 from (values(1)) union all select 'B' as c1 from (values(1))) t", temporaryTableName); + checkPermission(temporaryTableName); + } + + @Test(expected = UserRemoteException.class) + public void testCreationOutsideOfDefaultTemporaryWorkspace() throws Exception { + try { + String temporaryTableName = "temporary_table_outside_of_default_workspace"; + test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", temp2_schema, temporaryTableName); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: Temporary tables are not allowed to be created outside of default temporary workspace [%s].", + TEMP_SCHEMA))); + throw e; + } + } + + @Test(expected = UserRemoteException.class) + public void testCreateWhenTemporaryTableExistsWithoutSchema() throws Exception { + String temporaryTableName = "temporary_table_exists_without_schema"; + try { + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: A table or view with given name [%s]" + + " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA))); + throw e; + } + } + + @Test(expected = UserRemoteException.class) + public void testCreateWhenTemporaryTableExistsCaseInsensitive() throws Exception { + String temporaryTableName = "temporary_table_exists_without_schema"; + try { + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName.toUpperCase()); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: A table or view with given name [%s]" + + " already exists in schema [%s]", temporaryTableName.toUpperCase(), TEMP_SCHEMA))); + throw e; + } + } + + @Test(expected = UserRemoteException.class) + public void testCreateWhenTemporaryTableExistsWithSchema() throws Exception { + String temporaryTableName = "temporary_table_exists_with_schema"; + try { + test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName); + test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: A table or view with given name [%s]" + + " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA))); + throw e; + } + } + + @Test(expected = UserRemoteException.class) + public void testCreateWhenPersistentTableExists() throws Exception { + String persistentTableName = "persistent_table_exists"; + try { + test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, persistentTableName); + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", persistentTableName); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: A table or view with given name [%s]" + + " already exists in schema [%s]", persistentTableName, TEMP_SCHEMA))); + throw e; + } + } + + @Test(expected = UserRemoteException.class) + public void testCreateWhenViewExists() throws Exception { + String viewName = "view_exists"; + try { + test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, viewName); + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", viewName); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: A table or view with given name [%s]" + + " already exists in schema [%s]", viewName, TEMP_SCHEMA))); + throw e; + } + } + + @Test(expected = UserRemoteException.class) + public void testCreatePersistentTableWhenTemporaryTableExists() throws Exception { + String temporaryTableName = "temporary_table_exists_before_persistent"; + try { + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + test("create table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: A table or view with given name [%s]" + + " already exists in schema [%s]", temporaryTableName, TEMP_SCHEMA))); + throw e; + } + } + + @Test(expected = UserRemoteException.class) + public void testCreateViewWhenTemporaryTableExists() throws Exception { + String temporaryTableName = "temporary_table_exists_before_view"; + try { + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + test("create view %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: A non-view table with given name [%s] already exists in schema [%s]", + temporaryTableName, TEMP_SCHEMA))); + throw e; + } + } + + @Test + public void testTemporaryAndPersistentTablesPriority() throws Exception { + String name = "temporary_and_persistent_table"; + test("use %s", temp2_schema); + test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name); + test("create table %s as select 'persistent_table' as c1 from (values(1))", name); + + testBuilder() + .sqlQuery("select * from %s", name) + .unOrdered() + .baselineColumns("c1") + .baselineValues("temporary_table") + .go(); + + testBuilder() + .sqlQuery("select * from %s.%s", temp2_schema, name) + .unOrdered() + .baselineColumns("c1") + .baselineValues("persistent_table") + .go(); + + test("drop table %s", name); + + testBuilder() + .sqlQuery("select * from %s", name) + .unOrdered() + .baselineColumns("c1") + .baselineValues("persistent_table") + .go(); + } + + @Test + public void testTemporaryTableAndViewPriority() throws Exception { + String name = "temporary_table_and_view"; + test("use %s", temp2_schema); + test("create TEMPORARY table %s as select 'temporary_table' as c1 from (values(1))", name); + test("create view %s as select 'view' as c1 from (values(1))", name); + + testBuilder() + .sqlQuery("select * from %s", name) + .unOrdered() + .baselineColumns("c1") + .baselineValues("temporary_table") + .go(); + + testBuilder() + .sqlQuery("select * from %s.%s", temp2_schema, name) + .unOrdered() + .baselineColumns("c1") + .baselineValues("view") + .go(); + + test("drop table %s", name); + + testBuilder() + .sqlQuery("select * from %s", name) + .unOrdered() + .baselineColumns("c1") + .baselineValues("view") + .go(); + } + + @Test(expected = UserRemoteException.class) + public void testTemporaryTablesInViewDefinitions() throws Exception { + String temporaryTableName = "temporary_table_for_view_definition"; + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + + try { + test("create view %s.view_with_temp_table as select * from %s", TEMP_SCHEMA, temporaryTableName); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: Temporary tables usage is disallowed. Used temporary table name: [%s]", temporaryTableName))); + throw e; + } + } + + @Test + public void testManualDropWithoutSchema() throws Exception { + String temporaryTableName = "temporary_table_to_drop_without_schema"; + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + + testBuilder() + .sqlQuery("drop table %s", temporaryTableName) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName)) + .go(); + } + + @Test + public void testManualDropWithSchema() throws Exception { + String temporaryTableName = "temporary_table_to_drop_with_schema"; + test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName); + + testBuilder() + .sqlQuery("drop table %s.%s", TEMP_SCHEMA, temporaryTableName) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(true, String.format("Temporary table [%s] dropped", temporaryTableName)) + .go(); + } + + @Test + public void testDropTemporaryTableAsViewWithoutException() throws Exception { + String temporaryTableName = "temporary_table_to_drop_like_view_without_exception"; + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + + testBuilder() + .sqlQuery("drop view if exists %s.%s", TEMP_SCHEMA, temporaryTableName) + .unOrdered() + .baselineColumns("ok", "summary") + .baselineValues(false, String.format("View [%s] not found in schema [%s].", + temporaryTableName, TEMP_SCHEMA)) + .go(); + } + + @Test(expected = UserRemoteException.class) + public void testDropTemporaryTableAsViewWithException() throws Exception { + String temporaryTableName = "temporary_table_to_drop_like_view_with_exception"; + test("create TEMPORARY table %s as select 'A' as c1 from (values(1))", temporaryTableName); + + try { + test("drop view %s.%s", TEMP_SCHEMA, temporaryTableName); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format( + "VALIDATION ERROR: Unknown view [%s] in schema [%s]", temporaryTableName, TEMP_SCHEMA))); + throw e; + } + } + + private void checkPermission(String tmpTableName) throws IOException { + File[] files = findTemporaryTableLocation(tmpTableName); + assertEquals("Only one directory should match temporary table name " + tmpTableName, 1, files.length); + Path tmpTablePath = new Path(files[0].toURI().getPath()); + assertEquals("Directory permission should match", + expectedFolderPermission, fs.getFileStatus(tmpTablePath).getPermission()); + RemoteIterator fileIterator = fs.listFiles(tmpTablePath, false); + while (fileIterator.hasNext()) { + assertEquals("File permission should match", expectedFilePermission, fileIterator.next().getPermission()); + } + } + + private File[] findTemporaryTableLocation(String tableName) throws IOException { + File sessionTempLocation = new File(getDfsTestTmpSchemaLocation(), session_id.toString()); + Path sessionTempLocationPath = new Path(sessionTempLocation.toURI().getPath()); + assertTrue("Session temporary location must exist", fs.exists(sessionTempLocationPath)); + assertEquals("Session temporary location permission should match", + expectedFolderPermission, fs.getFileStatus(sessionTempLocationPath).getPermission()); + final String tableUUID = UUID.nameUUIDFromBytes(tableName.getBytes()).toString(); + return sessionTempLocation.listFiles(new FileFilter() { + @Override + public boolean accept(File path) { + return path.isDirectory() && path.getName().equals(tableUUID); + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java new file mode 100644 index 0000000..6a377ec --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/StorageStrategyTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import com.google.common.io.Files; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class StorageStrategyTest { + + private static final Configuration configuration = new Configuration(); + private static final FsPermission full_permission = new FsPermission("777"); + private static final StorageStrategy persistent_strategy = new StorageStrategy("775", "644", false); + private static final StorageStrategy temporary_strategy = new StorageStrategy("700", "600", true); + private FileSystem fs; + + @Before + public void setup() throws Exception { + initFileSystem(); + } + + @Test + public void testPermissionAndDeleteOnExitFalseForFileWithParent() throws Exception { + Path initialPath = prepareStorageDirectory(); + Path file = addNLevelsAndFile(initialPath, 2, true); + Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false); + + Path createdParentPath = persistent_strategy.createFileAndApply(fs, file); + + assertEquals("Path should match", firstCreatedParentPath, createdParentPath); + checkPathAndPermission(initialPath, file, true, 2, persistent_strategy); + checkDeleteOnExit(firstCreatedParentPath, true); + } + + @Test + public void testPermissionAndDeleteOnExitTrueForFileWithParent() throws Exception { + Path initialPath = prepareStorageDirectory(); + Path file = addNLevelsAndFile(initialPath, 2, true); + Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false); + + Path createdParentPath = temporary_strategy.createFileAndApply(fs, file); + + assertEquals("Path should match", firstCreatedParentPath, createdParentPath); + checkPathAndPermission(initialPath, file, true, 2, temporary_strategy); + checkDeleteOnExit(firstCreatedParentPath, false); + } + + @Test + public void testPermissionAndDeleteOnExitFalseForFileOnly() throws Exception { + Path initialPath = prepareStorageDirectory(); + Path file = addNLevelsAndFile(initialPath, 0, true); + + Path createdFile = persistent_strategy.createFileAndApply(fs, file); + + assertEquals("Path should match", file, createdFile); + checkPathAndPermission(initialPath, file, true, 0, persistent_strategy); + checkDeleteOnExit(file, true); + } + + @Test + public void testPermissionAndDeleteOnExitTrueForFileOnly() throws Exception { + Path initialPath = prepareStorageDirectory(); + Path file = addNLevelsAndFile(initialPath, 0, true); + + Path createdFile = temporary_strategy.createFileAndApply(fs, file); + + assertEquals("Path should match", file, createdFile); + checkPathAndPermission(initialPath, file, true, 0, temporary_strategy); + checkDeleteOnExit(file, false); + } + + @Test(expected = IOException.class) + public void testFailureOnExistentFile() throws Exception { + Path initialPath = prepareStorageDirectory(); + Path file = addNLevelsAndFile(initialPath, 0, true); + fs.createNewFile(file); + assertTrue("File should exist", fs.exists(file)); + try { + persistent_strategy.createFileAndApply(fs, file); + } catch (IOException e) { + assertEquals("Error message should match", String.format("File [%s] already exists on file system [%s].", + file.toUri().getPath(), fs.getUri()), e.getMessage()); + throw e; + } + } + + @Test + public void testCreatePathAndDeleteOnExitFalse() throws Exception { + Path initialPath = prepareStorageDirectory(); + Path resultPath = addNLevelsAndFile(initialPath, 2, false); + Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false); + + Path createdParentPath = persistent_strategy.createPathAndApply(fs, resultPath); + + assertEquals("Path should match", firstCreatedParentPath, createdParentPath); + checkPathAndPermission(initialPath, resultPath, false, 2, persistent_strategy); + checkDeleteOnExit(firstCreatedParentPath, true); + } + + @Test + public void testCreatePathAndDeleteOnExitTrue() throws Exception { + Path initialPath = prepareStorageDirectory(); + Path resultPath = addNLevelsAndFile(initialPath, 2, false); + Path firstCreatedParentPath = addNLevelsAndFile(initialPath, 1, false); + + Path createdParentPath = temporary_strategy.createPathAndApply(fs, resultPath); + + assertEquals("Path should match", firstCreatedParentPath, createdParentPath); + checkPathAndPermission(initialPath, resultPath, false, 2, temporary_strategy); + checkDeleteOnExit(firstCreatedParentPath, false); + } + + @Test + public void testCreateNoPath() throws Exception { + Path path = prepareStorageDirectory(); + + Path createdParentPath = temporary_strategy.createPathAndApply(fs, path); + + assertNull("Path should be null", createdParentPath); + assertEquals("Permission should match", full_permission, fs.getFileStatus(path).getPermission()); + } + + @Test + public void testStrategyForExistingFile() throws Exception { + Path initialPath = prepareStorageDirectory(); + Path file = addNLevelsAndFile(initialPath, 0, true); + fs.createNewFile(file); + fs.setPermission(file, full_permission); + + assertTrue("File should exist", fs.exists(file)); + assertEquals("Permission should match", full_permission, fs.getFileStatus(file).getPermission()); + + temporary_strategy.applyToFile(fs, file); + + assertEquals("Permission should match", new FsPermission(temporary_strategy.getFilePermission()), + fs.getFileStatus(file).getPermission()); + checkDeleteOnExit(file, false); + } + + private Path prepareStorageDirectory() throws IOException { + File storageDirectory = Files.createTempDir(); + storageDirectory.deleteOnExit(); + Path path = new Path(storageDirectory.toURI().getPath()); + fs.setPermission(path, full_permission); + return path; + } + + private void initFileSystem() throws IOException { + if (fs != null) { + try { + fs.close(); + } catch (Exception e) { + // do nothing + } + } + fs = FileSystem.get(configuration); + } + + private Path addNLevelsAndFile(Path initialPath, int levels, boolean addFile) { + Path resultPath = initialPath; + for (int i = 1; i <= levels; i++) { + resultPath = new Path(resultPath, "level" + i); + } + if (addFile) { + resultPath = new Path(resultPath, "test_file.txt"); + } + return resultPath; + } + + private void checkPathAndPermission(Path initialPath, + Path resultPath, + boolean isFile, + int levels, + StorageStrategy storageStrategy) throws IOException { + + assertEquals("Path type should match", isFile, fs.isFile(resultPath)); + assertEquals("Permission should match", full_permission, fs.getFileStatus(initialPath).getPermission()); + + if (isFile) { + assertEquals("Permission should match", new FsPermission(storageStrategy.getFilePermission()), + fs.getFileStatus(resultPath).getPermission()); + } + Path startingPath = initialPath; + FsPermission folderPermission = new FsPermission(storageStrategy.getFolderPermission()); + for (int i = 1; i <= levels; i++) { + startingPath = new Path(startingPath, "level" + i); + assertEquals("Permission should match", folderPermission, fs.getFileStatus(startingPath).getPermission()); + } + } + + private void checkDeleteOnExit(Path path, boolean isPresent) throws IOException { + assertTrue("Path should be present", fs.exists(path)); + // close and open file system to check for path presence + initFileSystem(); + assertEquals("Path existence flag should match", isPresent, fs.exists(path)); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json index 7b977e2..35ca26b 100644 --- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json @@ -9,7 +9,7 @@ writable: false }, "tmp" : { - location: "/tmp/drilltest", + location: "/tmp", writable: true } }, http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java index fbacd23..cd97ab7 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -176,6 +176,13 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea } /** + * Closes all resources connected with current session. + * By default has no implementation. + */ + public void closeSession() { + } + + /** * Connection consumer wants to close connection. Initiate connection close * and complete. This is a blocking call that ensures that the connection is * closed before returning. As part of this call, the channel close handler http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index c360e51..cdb9c07 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -164,7 +164,11 @@ public abstract class RpcBus imp } final ChannelClosedException ex = future.cause() != null ? new ChannelClosedException(msg, future.cause()) : new ChannelClosedException(msg); - clientConnection.channelClosed(ex); + try { + clientConnection.closeSession(); + } finally { + clientConnection.channelClosed(ex); + } } }