Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 16F9510FDC for ; Wed, 18 Feb 2015 22:28:53 +0000 (UTC) Received: (qmail 81498 invoked by uid 500); 18 Feb 2015 22:28:49 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 81350 invoked by uid 500); 18 Feb 2015 22:28:49 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 80990 invoked by uid 99); 18 Feb 2015 22:28:49 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Feb 2015 22:28:49 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 3EF0BAC030A for ; Wed, 18 Feb 2015 22:28:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1660751 [4/23] - in /hive/branches/cbo: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/ accumulo-h... Date: Wed, 18 Feb 2015 22:28:40 -0000 To: commits@hive.apache.org From: hashutosh@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150218222849.3EF0BAC030A@hades.apache.org> Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java Wed Feb 18 22:28:35 2015 @@ -22,23 +22,20 @@ import org.apache.hadoop.hive.ql.udf.gen import org.apache.hive.common.util.AnnotationUtils; @SuppressWarnings("deprecation") -public class WindowFunctionInfo implements CommonFunctionInfo { - boolean supportsWindow = true; - boolean pivotResult = false; - boolean impliesOrder = false; - FunctionInfo fInfo; - - WindowFunctionInfo(FunctionInfo fInfo) { - assert fInfo.isGenericUDAF(); - this.fInfo = fInfo; - Class wfnCls = fInfo.getGenericUDAFResolver().getClass(); +public class WindowFunctionInfo extends FunctionInfo { + + private final boolean supportsWindow; + private final boolean pivotResult; + private final boolean impliesOrder; + + public WindowFunctionInfo(boolean isNative, String functionName, + GenericUDAFResolver resolver, FunctionResource[] resources) { + super(isNative, functionName, resolver, resources); WindowFunctionDescription def = - AnnotationUtils.getAnnotation(wfnCls, WindowFunctionDescription.class); - if ( def != null) { - supportsWindow = def.supportsWindow(); - pivotResult = def.pivotResult(); - impliesOrder = def.impliesOrder(); - } + AnnotationUtils.getAnnotation(resolver.getClass(), WindowFunctionDescription.class); + supportsWindow = def == null ? true : def.supportsWindow(); + pivotResult = def == null ? false : def.pivotResult(); + impliesOrder = def == null ? false : def.impliesOrder(); } public boolean isSupportsWindow() { @@ -52,12 +49,4 @@ public class WindowFunctionInfo implemen public boolean isImpliesOrder() { return impliesOrder; } - public FunctionInfo getfInfo() { - return fInfo; - } - - @Override - public Class getFunctionClass() { - return getfInfo().getFunctionClass(); - } } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Wed Feb 18 22:28:35 2015 @@ -395,6 +395,7 @@ public class ExecDriver extends Task inputIterator) { + public HiveBaseFunctionResultList(Iterator inputIterator) { this.inputIterator = inputIterator; - this.lastRecordOutput = new HiveKVResultCache(conf); + this.lastRecordOutput = new HiveKVResultCache(); } @Override @@ -87,8 +86,6 @@ public abstract class HiveBaseFunctionRe return true; } - lastRecordOutput.clear(); - // Process the records in the input iterator until // - new output records are available for serving downstream operator, // - input records are exhausted or Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java Wed Feb 18 22:28:35 2015 @@ -17,141 +17,251 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.io.HiveKey; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.Reporter; import scala.Tuple2; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** - * Wrapper around {@link org.apache.hadoop.hive.ql.exec.persistence.RowContainer} - * - * This class is thread safe. + * A cache with fixed buffer. If the buffer is full, new entries will + * be written to disk. This class is thread safe since multiple threads + * could access it (doesn't have to be concurrently), for example, + * the StreamThread in ScriptOperator. */ -@SuppressWarnings({"deprecation", "unchecked", "rawtypes"}) -public class HiveKVResultCache { - - public static final int IN_MEMORY_CACHE_SIZE = 512; - private static final String COL_NAMES = "key,value"; - private static final String COL_TYPES = - serdeConstants.BINARY_TYPE_NAME + ":" + serdeConstants.BINARY_TYPE_NAME; - - // Used to cache rows added while container is iterated. - private RowContainer backupContainer; - - private RowContainer container; - private Configuration conf; - private int cursor = 0; - - public HiveKVResultCache(Configuration conf) { - container = initRowContainer(conf); - this.conf = conf; +@SuppressWarnings("unchecked") +class HiveKVResultCache { + private static final Log LOG = LogFactory.getLog(HiveKVResultCache.class); + + @VisibleForTesting + static final int IN_MEMORY_NUM_ROWS = 1024; + + private ObjectPair[] writeBuffer; + private ObjectPair[] readBuffer; + + private File parentFile; + private File tmpFile; + + private int readCursor = 0; + private int writeCursor = 0; + + // Indicate if the read buffer has data, for example, + // when in reading, data on disk could be pull in + private boolean readBufferUsed = false; + private int rowsInReadBuffer = 0; + + private Input input; + private Output output; + + public HiveKVResultCache() { + writeBuffer = new ObjectPair[IN_MEMORY_NUM_ROWS]; + readBuffer = new ObjectPair[IN_MEMORY_NUM_ROWS]; + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + writeBuffer[i] = new ObjectPair(); + readBuffer[i] = new ObjectPair(); + } } - private static RowContainer initRowContainer(Configuration conf) { - RowContainer container; - try { - container = new RowContainer(IN_MEMORY_CACHE_SIZE, conf, Reporter.NULL); - - String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - TableDesc tableDesc = - PlanUtils.getDefaultQueryOutputTableDesc(COL_NAMES, COL_TYPES, fileFormat); + private void switchBufferAndResetCursor() { + ObjectPair[] tmp = readBuffer; + rowsInReadBuffer = writeCursor; + readBuffer = writeBuffer; + readBufferUsed = true; + readCursor = 0; + writeBuffer = tmp; + writeCursor = 0; + } + + private void setupOutput() throws IOException { + if (parentFile == null) { + while (true) { + parentFile = File.createTempFile("hive-resultcache", ""); + if (parentFile.delete() && parentFile.mkdir()) { + parentFile.deleteOnExit(); + break; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Retry creating tmp result-cache directory..."); + } + } + } - SerDe serDe = (SerDe) tableDesc.getDeserializer(); - ObjectInspector oi = ObjectInspectorUtils.getStandardObjectInspector( - serDe.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); + if (tmpFile == null || input != null) { + tmpFile = File.createTempFile("ResultCache", ".tmp", parentFile); + LOG.info("ResultCache created temp file " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + } - container.setSerDe(serDe, oi); - container.setTableDesc(tableDesc); - } catch (Exception ex) { - throw new RuntimeException("Failed to create RowContainer", ex); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(tmpFile); + output = new Output(fos); + } finally { + if (output == null && fos != null) { + fos.close(); + } } - return container; } - public void add(HiveKey key, BytesWritable value) { - byte[] hiveKeyBytes = KryoSerializer.serialize(key); - BytesWritable wrappedHiveKey = new BytesWritable(hiveKeyBytes); - List row = new ArrayList(2); - row.add(wrappedHiveKey); - row.add(value); + private BytesWritable readValue(Input input) { + return new BytesWritable(input.readBytes(input.readInt())); + } - synchronized (this) { - try { - if (cursor == 0) { - container.addRow(row); - } else { - if (backupContainer == null) { - backupContainer = initRowContainer(conf); + private void writeValue(Output output, BytesWritable bytesWritable) { + int size = bytesWritable.getLength(); + output.writeInt(size); + output.writeBytes(bytesWritable.getBytes(), 0, size); + } + + private HiveKey readHiveKey(Input input) { + HiveKey hiveKey = new HiveKey( + input.readBytes(input.readInt()), input.readInt()); + hiveKey.setDistKeyLength(input.readInt()); + return hiveKey; + } + + private void writeHiveKey(Output output, HiveKey hiveKey) { + int size = hiveKey.getLength(); + output.writeInt(size); + output.writeBytes(hiveKey.getBytes(), 0, size); + output.writeInt(hiveKey.hashCode()); + output.writeInt(hiveKey.getDistKeyLength()); + } + + public synchronized void add(HiveKey key, BytesWritable value) { + if (writeCursor >= IN_MEMORY_NUM_ROWS) { // Write buffer is full + if (!readBufferUsed) { // Read buffer isn't used, switch buffer + switchBufferAndResetCursor(); + } else { + // Need to spill from write buffer to disk + try { + if (output == null) { + setupOutput(); } - backupContainer.addRow(row); + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + ObjectPair pair = writeBuffer[i]; + writeHiveKey(output, pair.getFirst()); + writeValue(output, pair.getSecond()); + pair.setFirst(null); + pair.setSecond(null); + } + writeCursor = 0; + } catch (Exception e) { + clear(); // Clean up the cache + throw new RuntimeException("Failed to spill rows to disk", e); } - } catch (HiveException ex) { - throw new RuntimeException("Failed to add KV pair to RowContainer", ex); } } + ObjectPair pair = writeBuffer[writeCursor++]; + pair.setFirst(key); + pair.setSecond(value); } public synchronized void clear() { - if (cursor == 0) { - return; - } - try { - container.clearRows(); - } catch (HiveException ex) { - throw new RuntimeException("Failed to clear rows in RowContainer", ex); + writeCursor = readCursor = rowsInReadBuffer = 0; + readBufferUsed = false; + + if (parentFile != null) { + if (input != null) { + try { + input.close(); + } catch (Throwable ignored) { + } + input = null; + } + if (output != null) { + try { + output.close(); + } catch (Throwable ignored) { + } + output = null; + } + try { + FileUtil.fullyDelete(parentFile); + } catch (Throwable ignored) { + } + parentFile = null; + tmpFile = null; } - cursor = 0; } public synchronized boolean hasNext() { - if (container.rowCount() > 0 && cursor < container.rowCount()) { - return true; - } - if (backupContainer == null - || backupContainer.rowCount() == 0) { - return false; - } - clear(); - // Switch containers - RowContainer tmp = container; - container = backupContainer; - backupContainer = tmp; - return true; + return readBufferUsed || writeCursor > 0; } - public Tuple2 next() { - try { - List row; - synchronized (this) { - Preconditions.checkState(hasNext()); - if (cursor == 0) { - row = container.first(); + public synchronized Tuple2 next() { + Preconditions.checkState(hasNext()); + if (!readBufferUsed) { + try { + if (input == null && output != null) { + // Close output stream if open + output.close(); + output = null; + + FileInputStream fis = null; + try { + fis = new FileInputStream(tmpFile); + input = new Input(fis); + } finally { + if (input == null && fis != null) { + fis.close(); + } + } + } + if (input != null) { + // Load next batch from disk + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + ObjectPair pair = readBuffer[i]; + pair.setFirst(readHiveKey(input)); + pair.setSecond(readValue(input)); + } + if (input.eof()) { + input.close(); + input = null; + } + rowsInReadBuffer = IN_MEMORY_NUM_ROWS; + readBufferUsed = true; + readCursor = 0; + } else if (writeCursor == 1) { + ObjectPair pair = writeBuffer[0]; + Tuple2 row = new Tuple2( + pair.getFirst(), pair.getSecond()); + pair.setFirst(null); + pair.setSecond(null); + writeCursor = 0; + return row; } else { - row = container.next(); + // No record on disk, more data in write buffer + switchBufferAndResetCursor(); } - cursor++; + } catch (Exception e) { + clear(); // Clean up the cache + throw new RuntimeException("Failed to load rows from disk", e); } - HiveKey key = KryoSerializer.deserialize(row.get(0).getBytes(), HiveKey.class); - return new Tuple2(key, row.get(1)); - } catch (HiveException ex) { - throw new RuntimeException("Failed to get row from RowContainer", ex); } + ObjectPair pair = readBuffer[readCursor]; + Tuple2 row = new Tuple2( + pair.getFirst(), pair.getSecond()); + pair.setFirst(null); + pair.setSecond(null); + if (++readCursor >= rowsInReadBuffer) { + readBufferUsed = false; + rowsInReadBuffer = 0; + readCursor = 0; + } + return row; } } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Wed Feb 18 22:28:35 2015 @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.util.Iterator; + import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; @@ -25,8 +27,6 @@ import org.apache.hadoop.io.BytesWritabl import scala.Tuple2; -import java.util.Iterator; - public class HiveMapFunction extends HivePairFlatMapFunction< Iterator>, HiveKey, BytesWritable> { @@ -51,7 +51,7 @@ public class HiveMapFunction extends Hiv mapRecordHandler = new SparkMapRecordHandler(); } - HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler); + HiveMapFunctionResultList result = new HiveMapFunctionResultList(it, mapRecordHandler); mapRecordHandler.init(jobConf, result, sparkReporter); return result; Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java Wed Feb 18 22:28:35 2015 @@ -17,27 +17,28 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BytesWritable; -import scala.Tuple2; - import java.io.IOException; import java.util.Iterator; +import org.apache.hadoop.io.BytesWritable; + +import scala.Tuple2; + public class HiveMapFunctionResultList extends HiveBaseFunctionResultList> { + private static final long serialVersionUID = 1L; private final SparkRecordHandler recordHandler; /** * Instantiate result set Iterable for Map function output. * - * @param conf Hive configuration. * @param inputIterator Input record iterator. * @param handler Initialized {@link SparkMapRecordHandler} instance. */ - public HiveMapFunctionResultList(Configuration conf, - Iterator> inputIterator, SparkRecordHandler handler) { - super(conf, inputIterator); + public HiveMapFunctionResultList( + Iterator> inputIterator, + SparkRecordHandler handler) { + super(inputIterator); recordHandler = handler; } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Wed Feb 18 22:28:35 2015 @@ -18,13 +18,13 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.util.Iterator; + import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import scala.Tuple2; -import java.util.Iterator; - public class HiveReduceFunction extends HivePairFlatMapFunction< Iterator>>, HiveKey, BytesWritable> { @@ -42,7 +42,7 @@ public class HiveReduceFunction extends SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler(); HiveReduceFunctionResultList result = - new HiveReduceFunctionResultList(jobConf, it, reducerRecordhandler); + new HiveReduceFunctionResultList(it, reducerRecordhandler); reducerRecordhandler.init(jobConf, result, sparkReporter); return result; Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java Wed Feb 18 22:28:35 2015 @@ -17,29 +17,29 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import java.util.Iterator; + import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; -import scala.Tuple2; -import java.io.IOException; -import java.util.Iterator; +import scala.Tuple2; public class HiveReduceFunctionResultList extends HiveBaseFunctionResultList>> { + private static final long serialVersionUID = 1L; private final SparkReduceRecordHandler reduceRecordHandler; /** * Instantiate result set Iterable for Reduce function output. * - * @param conf Hive configuration. * @param inputIterator Input record iterator. * @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance. */ - public HiveReduceFunctionResultList(Configuration conf, + public HiveReduceFunctionResultList( Iterator>> inputIterator, - SparkReduceRecordHandler reducer) { - super(conf, inputIterator); + SparkReduceRecordHandler reducer) { + super(inputIterator); this.reduceRecordHandler = reducer; } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java Wed Feb 18 22:28:35 2015 @@ -19,11 +19,11 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Utilities; Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Wed Feb 18 22:28:35 2015 @@ -22,8 +22,8 @@ import com.google.common.base.Strings; import java.io.IOException; import java.io.Serializable; -import java.net.MalformedURLException; -import java.net.URL; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -77,8 +77,8 @@ public class RemoteHiveSparkClient imple private transient SparkConf sparkConf; private transient HiveConf hiveConf; - private transient List localJars = new ArrayList(); - private transient List localFiles = new ArrayList(); + private transient List localJars = new ArrayList(); + private transient List localFiles = new ArrayList(); private final transient long sparkClientTimtout; @@ -128,7 +128,7 @@ public class RemoteHiveSparkClient imple return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus); } - private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { + private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException { // add hive-exec jar addJars((new JobConf(this.getClass())).getJar()); @@ -160,30 +160,32 @@ public class RemoteHiveSparkClient imple addResources(addedArchives); } - private void addResources(String addedFiles) { + private void addResources(String addedFiles) throws IOException { for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { try { - URL fileUrl = SparkUtilities.getURL(addedFile); - if (fileUrl != null && !localFiles.contains(fileUrl)) { - localFiles.add(fileUrl); - remoteClient.addFile(fileUrl); + URI fileUri = SparkUtilities.getURI(addedFile); + if (fileUri != null && !localFiles.contains(fileUri)) { + fileUri = SparkUtilities.uploadToHDFS(fileUri, hiveConf); + localFiles.add(fileUri); + remoteClient.addFile(fileUri); } - } catch (MalformedURLException e) { - LOG.warn("Failed to add file:" + addedFile); + } catch (URISyntaxException e) { + LOG.warn("Failed to add file:" + addedFile, e); } } } - private void addJars(String addedJars) { + private void addJars(String addedJars) throws IOException { for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { try { - URL jarUrl = SparkUtilities.getURL(addedJar); - if (jarUrl != null && !localJars.contains(jarUrl)) { - localJars.add(jarUrl); - remoteClient.addJar(jarUrl); + URI jarUri = SparkUtilities.getURI(addedJar); + if (jarUri != null && !localJars.contains(jarUri)) { + jarUri = SparkUtilities.uploadToHDFS(jarUri, hiveConf); + localJars.add(jarUri); + remoteClient.addJar(jarUri); } - } catch (MalformedURLException e) { - LOG.warn("Failed to add jar:" + addedJar); + } catch (URISyntaxException e) { + LOG.warn("Failed to add jar:" + addedJar, e); } } } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Wed Feb 18 22:28:35 2015 @@ -18,11 +18,15 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.File; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; @@ -50,25 +54,50 @@ public class SparkUtilities { return copy; } - public static URL getURL(String path) throws MalformedURLException { + public static URI getURI(String path) throws URISyntaxException { if (path == null) { return null; } - URL url = null; - try { URI uri = new URI(path); - if (uri.getScheme() != null) { - url = uri.toURL(); - } else { + if (uri.getScheme() == null) { // if no file schema in path, we assume it's file on local fs. - url = new File(path).toURI().toURL(); + uri = new File(path).toURI(); } - } catch (URISyntaxException e) { - // do nothing here, just return null if input path is not a valid URI. + + return uri; + } + + /** + * Copies local file to HDFS in yarn-cluster mode. + * + * @param source + * @param conf + * @return + * @throws IOException + */ + public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { + URI result = source; + if (conf.get("spark.master").equals("yarn-cluster")) { + if (!source.getScheme().equals("hdfs")) { + Path tmpDir = SessionState.getHDFSSessionPath(conf); + FileSystem fileSystem = FileSystem.get(conf); + fileSystem.copyFromLocalFile(new Path(source.getPath()), tmpDir); + String filePath = tmpDir + File.separator + getFileName(source); + Path fullPath = fileSystem.getFileStatus(new Path(filePath)).getPath(); + result = fullPath.toUri(); + } + } + return result; + } + + private static String getFileName(URI uri) { + if (uri == null) { + return null; } - return url; + String[] splits = uri.getPath().split(File.separator); + return splits[splits.length-1]; } public static SparkSession getSparkSession(HiveConf conf, Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java Wed Feb 18 22:28:35 2015 @@ -67,7 +67,7 @@ public class HiveHistoryImpl implements private static final String DELIMITER = " "; - private static final String ROW_COUNT_PATTERN = "TABLE_ID_(\\d+)_ROWCOUNT"; + private static final String ROW_COUNT_PATTERN = "RECORDS_OUT_(\\d+)(_)*(\\S+)*"; private static final Pattern rowCountPattern = Pattern.compile(ROW_COUNT_PATTERN); @@ -343,6 +343,10 @@ public class HiveHistoryImpl implements if (m.find()) { String tuple = m.group(1); + String tableName = m.group(3); + if (tableName != null) + return tableName; + return idToTableMap.get(tuple); } return null; Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java Wed Feb 18 22:28:35 2015 @@ -17,11 +17,9 @@ */ package org.apache.hadoop.hive.ql.hooks; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -37,10 +35,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; - import org.json.JSONObject; -import static org.apache.hadoop.hive.ql.hooks.HookContext.HookType.*; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * ATSHook sends query + plan info to Yarn App Timeline Server. To enable (hadoop 2.4 and up) set @@ -55,7 +52,7 @@ public class ATSHook implements ExecuteW private enum EntityTypes { HIVE_QUERY_ID }; private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; private enum OtherInfoTypes { QUERY, STATUS, TEZ, MAPRED }; - private enum PrimaryFilterTypes { user }; + private enum PrimaryFilterTypes { user, operationid }; private static final int WAIT_TIME = 3; public ATSHook() { @@ -101,6 +98,7 @@ public class ATSHook implements ExecuteW return; } String queryId = plan.getQueryId(); + String opId = hookContext.getOperationId(); long queryStartTime = plan.getQueryStartTime(); String user = hookContext.getUgi().getUserName(); int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); @@ -119,13 +117,13 @@ public class ATSHook implements ExecuteW JSONObject explainPlan = explain.getJSONPlan(null, null, rootTasks, plan.getFetchTask(), true, false, false); fireAndForget(conf, createPreHookEvent(queryId, query, - explainPlan, queryStartTime, user, numMrJobs, numTezJobs)); + explainPlan, queryStartTime, user, numMrJobs, numTezJobs, opId)); break; case POST_EXEC_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, true)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, true, opId)); break; case ON_FAILURE_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, false)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, false, opId)); break; default: //ignore @@ -139,7 +137,7 @@ public class ATSHook implements ExecuteW } TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, - long startTime, String user, int numMrJobs, int numTezJobs) throws Exception { + long startTime, String user, int numMrJobs, int numTezJobs, String opId) throws Exception { JSONObject queryObj = new JSONObject(); queryObj.put("queryText", query); @@ -148,12 +146,16 @@ public class ATSHook implements ExecuteW LOG.info("Received pre-hook notification for :" + queryId); if (LOG.isDebugEnabled()) { LOG.debug("Otherinfo: " + queryObj.toString()); + LOG.debug("Operation id: <" + opId + ">"); } TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(queryId); atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); + if (opId != null) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); + } TimelineEvent startEvt = new TimelineEvent(); startEvt.setEventType(EventTypes.QUERY_SUBMITTED.name()); @@ -166,13 +168,17 @@ public class ATSHook implements ExecuteW return atsEntity; } - TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, boolean success) { + TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, boolean success, + String opId) { LOG.info("Received post-hook notification for :" + queryId); TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(queryId); atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); + if (opId != null) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); + } TimelineEvent stopEvt = new TimelineEvent(); stopEvt.setEventType(EventTypes.QUERY_COMPLETED.name()); Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java Wed Feb 18 22:28:35 2015 @@ -29,7 +29,6 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; /** @@ -53,9 +52,13 @@ public class HookContext { final private Map inputPathToContentSummary; private final String ipAddress; private final String userName; + // unique id set for operation when run from HS2, base64 encoded value of + // TExecuteStatementResp.TOperationHandle.THandleIdentifier.guid + private final String operationId; public HookContext(QueryPlan queryPlan, HiveConf conf, - Map inputPathToContentSummary, String userName, String ipAddress) throws Exception { + Map inputPathToContentSummary, String userName, String ipAddress, + String operationId) throws Exception { this.queryPlan = queryPlan; this.conf = conf; this.inputPathToContentSummary = inputPathToContentSummary; @@ -67,8 +70,9 @@ public class HookContext { if(SessionState.get() != null){ linfo = SessionState.get().getLineageState().getLineageInfo(); } - this.ipAddress = ipAddress; this.userName = userName; + this.ipAddress = ipAddress; + this.operationId = operationId; } public QueryPlan getQueryPlan() { @@ -154,4 +158,8 @@ public class HookContext { public String getUserName() { return this.userName; } + + public String getOperationId() { + return operationId; + } } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java Wed Feb 18 22:28:35 2015 @@ -35,7 +35,7 @@ import org.apache.hadoop.hive.common.Fil import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; @@ -144,7 +144,7 @@ public class HiveIndexResult { int firstEnd = 0; int i = 0; for (int index = 0; index < bytes.length; index++) { - if (bytes[index] == LazySimpleSerDe.DefaultSeparators[0]) { + if (bytes[index] == LazySerDeParameters.DefaultSeparators[0]) { i++; firstEnd = index; } @@ -169,7 +169,7 @@ public class HiveIndexResult { int currentStart = firstEnd + 1; int currentEnd = firstEnd + 1; for (; currentEnd < bytes.length; currentEnd++) { - if (bytes[currentEnd] == LazySimpleSerDe.DefaultSeparators[1]) { + if (bytes[currentEnd] == LazySerDeParameters.DefaultSeparators[1]) { String one_offset = new String(bytes, currentStart, currentEnd - currentStart); Long offset = Long.parseLong(one_offset); Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java Wed Feb 18 22:28:35 2015 @@ -204,7 +204,7 @@ public class IndexPredicateAnalyzer { private ExprNodeDesc analyzeExpr( ExprNodeGenericFuncDesc expr, List searchConditions, - Object... nodeOutputs) { + Object... nodeOutputs) throws SemanticException { if (FunctionRegistry.isOpAnd(expr)) { assert(nodeOutputs.length == 2); Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Wed Feb 18 22:28:35 2015 @@ -129,9 +129,10 @@ public final class FileDump { OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); long sectionStart = stripeStart; for(OrcProto.Stream section: footer.getStreamsList()) { + String kind = section.hasKind() ? section.getKind().name() : "UNKNOWN"; System.out.println(" Stream: column " + section.getColumn() + - " section " + section.getKind() + " start: " + sectionStart + - " length " + section.getLength()); + " section " + kind + " start: " + sectionStart + + " length " + section.getLength()); sectionStart += section.getLength(); } for (int i = 0; i < footer.getColumnsCount(); ++i) { Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Feb 18 22:28:35 2015 @@ -2989,7 +2989,7 @@ class RecordReaderImpl implements Record // figure out which columns have a present stream boolean[] hasNull = new boolean[types.size()]; for(OrcProto.Stream stream: streamList) { - if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) { + if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) { hasNull[stream.getColumn()] = true; } } @@ -2997,7 +2997,9 @@ class RecordReaderImpl implements Record long length = stream.getLength(); int column = stream.getColumn(); OrcProto.Stream.Kind streamKind = stream.getKind(); - if (StreamName.getArea(streamKind) == StreamName.Area.DATA && + // since stream kind is optional, first check if it exists + if (stream.hasKind() && + (StreamName.getArea(streamKind) == StreamName.Area.DATA) && includedColumns[column]) { // if we aren't filtering or it is a dictionary, load it. if (includedRowGroups == null || @@ -3134,8 +3136,10 @@ class RecordReaderImpl implements Record long offset = 0; for(OrcProto.Stream streamDesc: streamDescriptions) { int column = streamDesc.getColumn(); + // do not create stream if stream kind does not exist if ((includeColumn == null || includeColumn[column]) && - StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) { + streamDesc.hasKind() && + (StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA)) { long length = streamDesc.getLength(); int first = -1; int last = -2; @@ -3381,7 +3385,7 @@ class RecordReaderImpl implements Record int len = (int) stream.getLength(); // row index stream and bloom filter are interlaced, check if the sarg column contains bloom // filter and combine the io to read row index and bloom filters for that column together - if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) { + if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) { boolean readBloomFilter = false; if (sargColumns != null && sargColumns[col] && nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) { Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Wed Feb 18 22:28:35 2015 @@ -26,6 +26,7 @@ import java.lang.management.ManagementFa import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -202,7 +203,14 @@ class WriterImpl implements Writer, Memo allColumns = getColumnNamesFromInspector(inspector); } this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize); - this.bloomFilterColumns = OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector); + if (version == OrcFile.Version.V_0_11) { + /* do not write bloom filters for ORC v11 */ + this.bloomFilterColumns = + OrcUtils.includeColumns(null, allColumns, inspector); + } else { + this.bloomFilterColumns = + OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector); + } this.bloomFilterFpp = bloomFilterFpp; treeWriter = createTreeWriter(inspector, streamFactory, false); if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Feb 18 22:28:35 2015 @@ -29,7 +29,6 @@ import static org.apache.hadoop.hive.ser import java.io.FileNotFoundException; import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -49,8 +48,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -73,7 +70,6 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.EventRequestType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventRequestData; @@ -102,6 +98,8 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -156,6 +154,31 @@ public class Hive { } }; + // register all permanent functions. need improvement + static { + try { + reloadFunctions(); + } catch (Exception e) { + LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e); + } + } + + public static void reloadFunctions() throws HiveException { + Hive db = Hive.get(); + for (String dbName : db.getAllDatabases()) { + for (String functionName : db.getFunctions(dbName, "*")) { + Function function = db.getFunction(dbName, functionName); + try { + FunctionRegistry.registerPermanentFunction(functionName, function.getClassName(), false, + FunctionTask.toFunctionResource(function.getResourceUris())); + } catch (Exception e) { + LOG.warn("Failed to register persistent function " + + functionName + ":" + function.getClassName() + ". Ignore and continue."); + } + } + } + } + public static Hive get(Configuration c, Class clazz) throws HiveException { return get(c instanceof HiveConf ? (HiveConf)c : new HiveConf(c, clazz)); } @@ -1590,10 +1613,18 @@ private void constructOneLBLocationMap(F throws HiveException { List newFiles = new ArrayList(); Table tbl = getTable(tableName); + HiveConf sessionConf = SessionState.getSessionConf(); if (replace) { - tbl.replaceFiles(loadPath, isSrcLocal); + Path tableDest = tbl.getPath(); + replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal); } else { - tbl.copyFiles(loadPath, isSrcLocal, isAcid, newFiles); + FileSystem fs; + try { + fs = tbl.getDataLocation().getFileSystem(sessionConf); + copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, newFiles); + } catch (IOException e) { + throw new HiveException("addFiles: filesystem error in check phase", e); + } tbl.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, "true"); } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Wed Feb 18 22:28:35 2015 @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.OutputFormat; @@ -246,10 +247,8 @@ public class Partition implements Serial final public Deserializer getDeserializer() { if (deserializer == null) { try { - deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(), + deserializer = MetaStoreUtils.getDeserializer(SessionState.getSessionConf(), tPartition, table.getTTable()); - } catch (HiveException e) { - throw new RuntimeException(e); } catch (MetaException e) { throw new RuntimeException(e); } @@ -367,7 +366,7 @@ public class Partition implements Serial try { // Previously, this got the filesystem of the Table, which could be // different from the filesystem of the partition. - FileSystem fs = getDataLocation().getFileSystem(Hive.get().getConf()); + FileSystem fs = getDataLocation().getFileSystem(SessionState.getSessionConf()); String pathPattern = getDataLocation().toString(); if (getBucketCount() > 0) { pathPattern = pathPattern + "/*"; @@ -495,11 +494,11 @@ public class Partition implements Serial public List getCols() { try { - if (Table.hasMetastoreBasedSchema(Hive.get().getConf(), tPartition.getSd())) { + if (Table.hasMetastoreBasedSchema(SessionState.getSessionConf(), tPartition.getSd())) { return tPartition.getSd().getCols(); } - return Hive.getFieldsFromDeserializer(table.getTableName(), getDeserializer()); - } catch (HiveException e) { + return MetaStoreUtils.getFieldsFromDeserializer(table.getTableName(), getDeserializer()); + } catch (Exception e) { LOG.error("Unable to get cols from serde: " + tPartition.getSd().getSerdeInfo().getSerializationLib(), e); } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Wed Feb 18 22:28:35 2015 @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.metadata; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -41,7 +40,6 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.ProtectMode; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SerDeInfo; @@ -259,7 +257,7 @@ public class Table implements Serializab } final public Class getDeserializerClass() throws Exception { - return MetaStoreUtils.getDeserializerClass(Hive.get().getConf(), tTable); + return MetaStoreUtils.getDeserializerClass(SessionState.getSessionConf(), tTable); } final public Deserializer getDeserializer(boolean skipConfError) { @@ -271,11 +269,9 @@ public class Table implements Serializab final public Deserializer getDeserializerFromMetaStore(boolean skipConfError) { try { - return MetaStoreUtils.getDeserializer(Hive.get().getConf(), tTable, skipConfError); + return MetaStoreUtils.getDeserializer(SessionState.getSessionConf(), tTable, skipConfError); } catch (MetaException e) { throw new RuntimeException(e); - } catch (HiveException e) { - throw new RuntimeException(e); } } @@ -285,7 +281,7 @@ public class Table implements Serializab } try { storageHandler = HiveUtils.getStorageHandler( - Hive.get().getConf(), + SessionState.getSessionConf(), getProperty( org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE)); } catch (Exception e) { @@ -589,12 +585,12 @@ public class Table implements Serializab String serializationLib = getSerializationLib(); try { - if (hasMetastoreBasedSchema(Hive.get().getConf(), serializationLib)) { + if (hasMetastoreBasedSchema(SessionState.getSessionConf(), serializationLib)) { return tTable.getSd().getCols(); } else { - return Hive.getFieldsFromDeserializer(getTableName(), getDeserializer()); + return MetaStoreUtils.getFieldsFromDeserializer(getTableName(), getDeserializer()); } - } catch (HiveException e) { + } catch (Exception e) { LOG.error("Unable to get field from serde: " + serializationLib, e); } return new ArrayList(); @@ -625,45 +621,6 @@ public class Table implements Serializab return tTable.getSd().getNumBuckets(); } - /** - * Replaces the directory corresponding to the table by srcf. Works by - * deleting the table directory and renaming the source directory. - * - * @param srcf - * Source directory - * @param isSrcLocal - * If the source directory is LOCAL - */ - protected void replaceFiles(Path srcf, boolean isSrcLocal) - throws HiveException { - Path tableDest = getPath(); - Hive.replaceFiles(tableDest, srcf, tableDest, tableDest, Hive.get().getConf(), - isSrcLocal); - } - - /** - * Inserts files specified into the partition. Works by moving files - * - * @param srcf - * Files to be moved. Leaf directories or globbed file paths - * @param isSrcLocal - * If the source directory is LOCAL - * @param isAcid - * True if this is an ACID based insert, update, or delete - * @param newFiles optional list of paths. If non-null, then all files copyied to the table - * will be added to this list. - */ - protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid, List newFiles) - throws HiveException { - FileSystem fs; - try { - fs = getDataLocation().getFileSystem(Hive.get().getConf()); - Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, isAcid, newFiles); - } catch (IOException e) { - throw new HiveException("addFiles: filesystem error in check phase", e); - } - } - public void setInputFormatClass(String name) throws HiveException { if (name == null) { inputFormatClass = null; @@ -934,22 +891,12 @@ public class Table implements Serializab return dbName + "@" + tabName; } - /** - * @return List containing Indexes names if there are indexes on this table - * @throws HiveException - **/ - public List getAllIndexes(short max) throws HiveException { - Hive hive = Hive.get(); - return hive.getIndexes(getTTable().getDbName(), getTTable().getTableName(), max); - } - @SuppressWarnings("nls") public FileStatus[] getSortedPaths() { try { // Previously, this got the filesystem of the Table, which could be // different from the filesystem of the partition. - FileSystem fs = FileSystem.get(getPath().toUri(), Hive.get() - .getConf()); + FileSystem fs = FileSystem.get(getPath().toUri(), SessionState.getSessionConf()); String pathPattern = getPath().toString(); if (getNumBuckets() > 0) { pathPattern = pathPattern + "/*"; Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Wed Feb 18 22:28:35 2015 @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -68,9 +69,11 @@ import org.apache.hadoop.hive.ql.plan.Se import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; +import org.apache.hadoop.hive.ql.plan.ptf.PTFInputDef; +import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; +import org.apache.hadoop.hive.ql.plan.ptf.ShapeDetails; import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef; -import org.apache.hadoop.hive.ql.udf.ptf.Noop; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -260,34 +263,37 @@ public final class ColumnPrunerProcFacto PTFDesc conf = op.getConf(); //Since we cannot know what columns will be needed by a PTF chain, //we do not prune columns on PTFOperator for PTF chains. - if (!conf.forWindowing() && !Noop.class.isInstance(conf.getFuncDef().getTFunction())) { + PartitionedTableFunctionDef funcDef = conf.getFuncDef(); + List referencedColumns = funcDef.getReferencedColumns(); + if (!conf.forWindowing() && !conf.forNoop() && referencedColumns == null) { return super.process(nd, stack, cppCtx, nodeOutputs); } List prunedCols = cppCtx.getPrunedColList(op.getChildOperators().get(0)); - - WindowTableFunctionDef def = null; if (conf.forWindowing()) { - def = (WindowTableFunctionDef) conf.getFuncDef(); + WindowTableFunctionDef def = (WindowTableFunctionDef) funcDef; prunedCols = Utilities.mergeUniqElems(getWindowFunctionColumns(def), prunedCols); - prunedCols = prunedColumnsList(prunedCols, def); - } - - RowSchema oldRS = op.getSchema(); - ArrayList sig = buildPrunedRR(prunedCols, oldRS); - op.getSchema().setSignature(sig); + } else if (conf.forNoop()) { + prunedCols = new ArrayList(cppCtx.getPrunedColList(op.getChildOperators().get(0))); + } else { + prunedCols = referencedColumns; + } + + List newRS = prunedColumnsList(prunedCols, op.getSchema(), funcDef); + + op.getSchema().setSignature(new ArrayList(newRS)); - prunedCols = def == null ? prunedCols : prunedInputList(prunedCols, def); - cppCtx.getPrunedColLists().put(op, prunedCols); + ShapeDetails outputShape = funcDef.getStartOfChain().getInput().getOutputShape(); + cppCtx.getPrunedColLists().put(op, outputShape.getColumnNames()); return null; } - private static ArrayList buildPrunedRR(List prunedCols, - RowSchema oldRS) throws SemanticException{ + private List buildPrunedRS(List prunedCols, RowSchema oldRS) + throws SemanticException { ArrayList sig = new ArrayList(); HashSet prunedColsSet = new HashSet(prunedCols); - for(ColumnInfo cInfo : oldRS.getSignature()) { - if ( prunedColsSet.contains(cInfo.getInternalName())) { + for (ColumnInfo cInfo : oldRS.getSignature()) { + if (prunedColsSet.contains(cInfo.getInternalName())) { sig.add(cInfo); } } @@ -305,48 +311,74 @@ public final class ColumnPrunerProcFacto return columns; } + private RowResolver buildPrunedRR(List prunedCols, RowSchema oldRS) + throws SemanticException { + RowResolver resolver = new RowResolver(); + HashSet prunedColsSet = new HashSet(prunedCols); + for (ColumnInfo cInfo : oldRS.getSignature()) { + if (prunedColsSet.contains(cInfo.getInternalName())) { + resolver.put(cInfo.getTabAlias(), cInfo.getAlias(), cInfo); + } + } + return resolver; + } + /* * add any input columns referenced in WindowFn args or expressions. */ - private ArrayList prunedColumnsList(List prunedCols, - WindowTableFunctionDef tDef) { - //we create a copy of prunedCols to create a list of pruned columns for PTFOperator - ArrayList mergedColList = new ArrayList(prunedCols); - if ( tDef.getWindowFunctions() != null ) { - for(WindowFunctionDef wDef : tDef.getWindowFunctions() ) { - if ( wDef.getArgs() == null) { - continue; + private List prunedColumnsList(List prunedCols, RowSchema oldRS, + PartitionedTableFunctionDef pDef) throws SemanticException { + pDef.getOutputShape().setRr(null); + pDef.getOutputShape().setColumnNames(null); + if (pDef instanceof WindowTableFunctionDef) { + WindowTableFunctionDef tDef = (WindowTableFunctionDef) pDef; + if (tDef.getWindowFunctions() != null) { + for (WindowFunctionDef wDef : tDef.getWindowFunctions()) { + if (wDef.getArgs() == null) { + continue; + } + for (PTFExpressionDef arg : wDef.getArgs()) { + ExprNodeDesc exprNode = arg.getExprNode(); + Utilities.mergeUniqElems(prunedCols, exprNode.getCols()); + } + } + } + if (tDef.getPartition() != null) { + for (PTFExpressionDef col : tDef.getPartition().getExpressions()) { + ExprNodeDesc exprNode = col.getExprNode(); + Utilities.mergeUniqElems(prunedCols, exprNode.getCols()); } - for(PTFExpressionDef arg : wDef.getArgs()) { - ExprNodeDesc exprNode = arg.getExprNode(); - Utilities.mergeUniqElems(mergedColList, exprNode.getCols()); + } + if (tDef.getOrder() != null) { + for (PTFExpressionDef col : tDef.getOrder().getExpressions()) { + ExprNodeDesc exprNode = col.getExprNode(); + Utilities.mergeUniqElems(prunedCols, exprNode.getCols()); } } + } else { + pDef.getOutputShape().setRr(buildPrunedRR(prunedCols, oldRS)); } - if(tDef.getPartition() != null){ - for(PTFExpressionDef col : tDef.getPartition().getExpressions()){ - ExprNodeDesc exprNode = col.getExprNode(); - Utilities.mergeUniqElems(mergedColList, exprNode.getCols()); - } - } - if(tDef.getOrder() != null){ - for(PTFExpressionDef col : tDef.getOrder().getExpressions()){ - ExprNodeDesc exprNode = col.getExprNode(); - Utilities.mergeUniqElems(mergedColList, exprNode.getCols()); - } - } - return mergedColList; + + PTFInputDef input = pDef.getInput(); + if (input instanceof PartitionedTableFunctionDef) { + return prunedColumnsList(prunedCols, oldRS, (PartitionedTableFunctionDef)input); + } + + ArrayList inputColumns = prunedInputList(prunedCols, input); + input.getOutputShape().setRr(buildPrunedRR(inputColumns, oldRS)); + input.getOutputShape().setColumnNames(inputColumns); + + return buildPrunedRS(prunedCols, oldRS); } /* * from the prunedCols list filter out columns that refer to WindowFns or WindowExprs * the returned list is set as the prunedList needed by the PTFOp. */ - private ArrayList prunedInputList(List prunedCols, - WindowTableFunctionDef tDef) { + private ArrayList prunedInputList(List prunedCols, PTFInputDef tDef) { ArrayList prunedInputCols = new ArrayList(); - StructObjectInspector OI = tDef.getInput().getOutputShape().getOI(); + StructObjectInspector OI = tDef.getOutputShape().getOI(); for(StructField f : OI.getAllStructFieldRefs()) { String fName = f.getFieldName(); if ( prunedCols.contains(fName)) { Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Feb 18 22:28:35 2015 @@ -90,7 +90,7 @@ import org.apache.hadoop.hive.ql.plan.Ex import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileMergeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -586,7 +586,7 @@ public final class GenMapRedUtils { // Later the properties have to come from the partition as opposed // to from the table in order to support versioning. Path[] paths = null; - sampleDesc sampleDescr = parseCtx.getOpToSamplePruner().get(topOp); + SampleDesc sampleDescr = parseCtx.getOpToSamplePruner().get(topOp); // Lookup list bucketing pruner Map partToPruner = parseCtx.getOpToPartToSkewedPruner().get(topOp); Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java Wed Feb 18 22:28:35 2015 @@ -190,7 +190,7 @@ public final class IndexUtils { List indexesOnTable; try { - indexesOnTable = baseTableMetaData.getAllIndexes((short) -1); // get all indexes + indexesOnTable = getAllIndexes(baseTableMetaData, (short) -1); // get all indexes } catch (HiveException e) { throw new SemanticException("Error accessing metastore", e); } @@ -204,6 +204,14 @@ public final class IndexUtils { return matchingIndexes; } + /** + * @return List containing Indexes names if there are indexes on this table + * @throws HiveException + **/ + public static List getAllIndexes(Table table, short max) throws HiveException { + Hive hive = Hive.get(); + return hive.getIndexes(table.getTTable().getDbName(), table.getTTable().getTableName(), max); + } public static Task createRootTask(HiveConf builderConf, Set inputs, Set outputs, StringBuilder command, Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SamplePruner.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SamplePruner.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SamplePruner.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SamplePruner.java Wed Feb 18 22:28:35 2015 @@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.metadat import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FilterDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc; /** * The transformation step that does sample pruning. @@ -61,17 +61,17 @@ public class SamplePruner implements Tra * */ public static class SamplePrunerCtx implements NodeProcessorCtx { - HashMap opToSamplePruner; + HashMap opToSamplePruner; public SamplePrunerCtx( - HashMap opToSamplePruner) { + HashMap opToSamplePruner) { this.opToSamplePruner = opToSamplePruner; } /** * @return the opToSamplePruner */ - public HashMap getOpToSamplePruner() { + public HashMap getOpToSamplePruner() { return opToSamplePruner; } @@ -80,7 +80,7 @@ public class SamplePruner implements Tra * the opToSamplePruner to set */ public void setOpToSamplePruner( - HashMap opToSamplePruner) { + HashMap opToSamplePruner) { this.opToSamplePruner = opToSamplePruner; } } @@ -135,7 +135,7 @@ public class SamplePruner implements Tra Object... nodeOutputs) throws SemanticException { FilterOperator filOp = (FilterOperator) nd; FilterDesc filOpDesc = filOp.getConf(); - sampleDesc sampleDescr = filOpDesc.getSampleDescr(); + SampleDesc sampleDescr = filOpDesc.getSampleDescr(); if ((sampleDescr == null) || !sampleDescr.getInputPruning()) { return null; @@ -182,7 +182,7 @@ public class SamplePruner implements Tra * @throws SemanticException */ @SuppressWarnings("nls") - public static Path[] prune(Partition part, sampleDesc sampleDescr) + public static Path[] prune(Partition part, SampleDesc sampleDescr) throws SemanticException { int num = sampleDescr.getNumerator(); int den = sampleDescr.getDenominator(); Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java Wed Feb 18 22:28:35 2015 @@ -64,7 +64,7 @@ import com.google.common.collect.Immutab import com.google.common.collect.Maps; public class SqlFunctionConverter { - private static final Log LOG = LogFactory.getLog(SqlFunctionConverter.class); + private static final Log LOG = LogFactory.getLog(SqlFunctionConverter.class); static final Map hiveToCalcite; static final Map calciteToHiveToken; @@ -79,7 +79,7 @@ public class SqlFunctionConverter { public static SqlOperator getCalciteOperator(String funcTextName, GenericUDF hiveUDF, ImmutableList calciteArgTypes, RelDataType retType) - throws CalciteSemanticException { + throws SemanticException { // handle overloaded methods first if (hiveUDF instanceof GenericUDFOPNegative) { return SqlStdOperatorTable.UNARY_MINUS; @@ -182,7 +182,8 @@ public class SqlFunctionConverter { } catch (UDFArgumentException e) { throw new RuntimeException(e); } - return new FunctionInfo(fi.isNative(), fi.getDisplayName(), (GenericUDF) udf); + return new FunctionInfo( + fi.isNative(), fi.getDisplayName(), (GenericUDF) udf, fi.getResources()); } // TODO: 1) handle Agg Func Name translation 2) is it correct to add func Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java Wed Feb 18 22:28:35 2015 @@ -348,7 +348,7 @@ public class PartitionPruner implements if (!(expr instanceof ExprNodeGenericFuncDesc)) { return false; } - if (!FunctionRegistry.isNativeFuncExpr((ExprNodeGenericFuncDesc)expr)) { + if (!FunctionRegistry.isBuiltInFuncExpr((ExprNodeGenericFuncDesc) expr)) { return true; } for (ExprNodeDesc child : expr.getChildren()) { Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java Wed Feb 18 22:28:35 2015 @@ -866,49 +866,30 @@ public class CalcitePlanner extends Sema ASTNode tabref = getQB().getAliases().isEmpty() ? null : getQB().getParseInfo() .getSrcForAlias(getQB().getAliases().get(0)); - for (Map.Entry lEntry : leftmap.entrySet()) { - String field = lEntry.getKey(); + + // 3. construct Union Output RR using original left & right Input + RowResolver unionoutRR = new RowResolver(); + + Iterator> lIter = leftmap.entrySet().iterator(); + Iterator> rIter = rightmap.entrySet().iterator(); + while (lIter.hasNext()) { + Map.Entry lEntry = lIter.next(); + Map.Entry rEntry = rIter.next(); ColumnInfo lInfo = lEntry.getValue(); - ColumnInfo rInfo = rightmap.get(field); - if (rInfo == null) { - throw new SemanticException(SemanticAnalyzer.generateErrorMessage(tabref, - "Schema of both sides of union should match. " + rightalias - + " does not have the field " + field)); - } - if (lInfo == null) { - throw new SemanticException(SemanticAnalyzer.generateErrorMessage(tabref, - "Schema of both sides of union should match. " + leftalias - + " does not have the field " + field)); - } - if (!lInfo.getInternalName().equals(rInfo.getInternalName())) { - throw new CalciteSemanticException(SemanticAnalyzer.generateErrorMessage( - tabref, - "Schema of both sides of union should match: field " + field + ":" - + " appears on the left side of the UNION at column position: " - + SemanticAnalyzer.getPositionFromInternalName(lInfo.getInternalName()) - + ", and on the right side of the UNION at column position: " - + SemanticAnalyzer.getPositionFromInternalName(rInfo.getInternalName()) - + ". Column positions should match for a UNION")); - } - // try widening coversion, otherwise fail union + ColumnInfo rInfo = rEntry.getValue(); + + String field = lEntry.getKey(); + // try widening conversion, otherwise fail union TypeInfo commonTypeInfo = FunctionRegistry.getCommonClassForUnionAll(lInfo.getType(), rInfo.getType()); if (commonTypeInfo == null) { - throw new CalciteSemanticException(SemanticAnalyzer.generateErrorMessage(tabref, - "Schema of both sides of union should match: Column " + field + " is of type " - + lInfo.getType().getTypeName() + " on first table and type " - + rInfo.getType().getTypeName() + " on second table")); + throw new SemanticException(generateErrorMessage(tabref, + "Schema of both sides of union should match: Column " + field + + " is of type " + lInfo.getType().getTypeName() + + " on first table and type " + rInfo.getType().getTypeName() + + " on second table")); } - } - - // 3. construct Union Output RR using original left & right Input - RowResolver unionoutRR = new RowResolver(); - for (Map.Entry lEntry : leftmap.entrySet()) { - String field = lEntry.getKey(); - ColumnInfo lInfo = lEntry.getValue(); - ColumnInfo rInfo = rightmap.get(field); ColumnInfo unionColInfo = new ColumnInfo(lInfo); - unionColInfo.setTabAlias(unionalias); unionColInfo.setType(FunctionRegistry.getCommonClassForUnionAll(lInfo.getType(), rInfo.getType())); unionoutRR.put(unionalias, field, unionColInfo); @@ -2644,6 +2625,7 @@ public class CalcitePlanner extends Sema LOG.debug("Created Plan for Query Block " + qb.getId()); } + setQB(qb); return srcRel; } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java Wed Feb 18 22:28:35 2015 @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.ResourceType; import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionUtils; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.hooks.E import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; +import org.apache.hadoop.hive.ql.plan.ReloadFunctionDesc; import org.apache.hadoop.hive.ql.plan.DropFunctionDesc; import org.apache.hadoop.hive.ql.plan.FunctionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -53,11 +55,12 @@ public class FunctionSemanticAnalyzer ex @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - if (ast.getToken().getType() == HiveParser.TOK_CREATEFUNCTION) { + if (ast.getType() == HiveParser.TOK_CREATEFUNCTION) { analyzeCreateFunction(ast); - } - if (ast.getToken().getType() == HiveParser.TOK_DROPFUNCTION) { - analyzeDropFunction(ast); + } else if (ast.getType() == HiveParser.TOK_DROPFUNCTION) { + analyzeDropFunction(ast); + } else if (ast.getType() == HiveParser.TOK_RELOADFUNCTION) { + rootTasks.add(TaskFactory.get(new FunctionWork(new ReloadFunctionDesc()), conf)); } LOG.info("analyze done"); @@ -93,13 +96,16 @@ public class FunctionSemanticAnalyzer ex boolean throwException = !ifExists && !HiveConf.getBoolVar(conf, ConfVars.DROPIGNORESNONEXISTENT); - if (FunctionRegistry.getFunctionInfo(functionName) == null) { + FunctionInfo info = FunctionRegistry.getFunctionInfo(functionName); + if (info == null) { if (throwException) { throw new SemanticException(ErrorMsg.INVALID_FUNCTION.getMsg(functionName)); } else { // Fail silently return; } + } else if (info.isBuiltIn()) { + throw new SemanticException(ErrorMsg.DROP_NATIVE_FUNCTION.getMsg(functionName)); } boolean isTemporaryFunction = (ast.getFirstChildWithType(HiveParser.TOK_TEMPORARY) != null); Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g Wed Feb 18 22:28:35 2015 @@ -297,6 +297,7 @@ KW_REWRITE : 'REWRITE'; KW_AUTHORIZATION: 'AUTHORIZATION'; KW_CONF: 'CONF'; KW_VALUES: 'VALUES'; +KW_RELOAD: 'RELOAD'; // Operators // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work. Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1660751&r1=1660750&r2=1660751&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Wed Feb 18 22:28:35 2015 @@ -216,6 +216,7 @@ TOK_STRINGLITERALSEQUENCE; TOK_CHARSETLITERAL; TOK_CREATEFUNCTION; TOK_DROPFUNCTION; +TOK_RELOADFUNCTION; TOK_CREATEMACRO; TOK_DROPMACRO; TOK_TEMPORARY; @@ -702,6 +703,7 @@ ddlStatement | createIndexStatement | dropIndexStatement | dropFunctionStatement + | reloadFunctionStatement | dropMacroStatement | analyzeStatement | lockStatement @@ -1625,6 +1627,11 @@ dropFunctionStatement -> ^(TOK_DROPFUNCTION functionIdentifier ifExists?) ; +reloadFunctionStatement +@init { pushMsg("reload function statement", state); } +@after { popMsg(state); } + : KW_RELOAD KW_FUNCTION -> ^(TOK_RELOADFUNCTION); + createMacroStatement @init { pushMsg("create macro statement", state); } @after { popMsg(state); } @@ -2277,8 +2284,8 @@ insertClause @after { popMsg(state); } : KW_INSERT KW_OVERWRITE destination ifNotExists? -> ^(TOK_DESTINATION destination ifNotExists?) - | KW_INSERT KW_INTO KW_TABLE? tableOrPartition - -> ^(TOK_INSERT_INTO tableOrPartition) + | KW_INSERT KW_INTO KW_TABLE? tableOrPartition (LPAREN targetCols=columnNameList RPAREN)? + -> ^(TOK_INSERT_INTO tableOrPartition $targetCols?) ; destination