Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 592BF44B0 for ; Tue, 17 May 2011 23:21:43 +0000 (UTC) Received: (qmail 16560 invoked by uid 500); 17 May 2011 23:21:43 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 16532 invoked by uid 500); 17 May 2011 23:21:43 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 16521 invoked by uid 99); 17 May 2011 23:21:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 May 2011 23:21:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 May 2011 23:21:41 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 386C12388A3B; Tue, 17 May 2011 23:21:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1104625 - in /hive/trunk: cli/src/java/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/common/ ql/src/java/org/apache/hadoop/hive/ql/exec/ Date: Tue, 17 May 2011 23:21:21 -0000 To: commits@hive.apache.org From: nzhang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110517232121.386C12388A3B@eris.apache.org> Author: nzhang Date: Tue May 17 23:21:20 2011 New Revision: 1104625 URL: http://svn.apache.org/viewvc?rev=1104625&view=rev Log: HIVE-243. ^C breaks out of running query, but not whole CLI (George Djabarov via Ning Zhang) Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java Modified: hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Modified: hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java URL: http://svn.apache.org/viewvc/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java?rev=1104625&r1=1104624&r2=1104625&view=diff ============================================================================== --- hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (original) +++ hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java Tue May 17 23:21:20 2011 @@ -43,12 +43,14 @@ import org.apache.commons.lang.StringUti import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.HadoopJobExecHelper; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; import org.apache.hadoop.hive.ql.parse.ParseDriver; @@ -61,6 +63,10 @@ import org.apache.hadoop.hive.service.Hi import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.thrift.TException; +import sun.misc.Signal; +import sun.misc.SignalHandler; + + /** * CliDriver. * @@ -155,48 +161,48 @@ public class CliDriver { } } } else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server - HiveClient client = ss.getClient(); - PrintStream out = ss.out; - PrintStream err = ss.err; + HiveClient client = ss.getClient(); + PrintStream out = ss.out; + PrintStream err = ss.err; - try { - client.execute(cmd_trimmed); - List results; - do { - results = client.fetchN(LINES_TO_FETCH); - for (String line: results) { - out.println(line); - } - } while (results.size() == LINES_TO_FETCH); - } catch (HiveServerException e) { - ret = e.getErrorCode(); - if (ret != 0) { // OK if ret == 0 -- reached the EOF - String errMsg = e.getMessage(); - if (errMsg == null) { - errMsg = e.toString(); - } - ret = e.getErrorCode(); - err.println("[Hive Error]: " + errMsg); + try { + client.execute(cmd_trimmed); + List results; + do { + results = client.fetchN(LINES_TO_FETCH); + for (String line : results) { + out.println(line); } - } catch (TException e) { + } while (results.size() == LINES_TO_FETCH); + } catch (HiveServerException e) { + ret = e.getErrorCode(); + if (ret != 0) { // OK if ret == 0 -- reached the EOF String errMsg = e.getMessage(); if (errMsg == null) { errMsg = e.toString(); } - ret = -10002; - err.println("[Thrift Error]: " + errMsg); - } finally { - try { - client.clean(); - } catch (TException e) { - String errMsg = e.getMessage(); - if (errMsg == null) { - errMsg = e.toString(); - } - err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: " - + errMsg); + ret = e.getErrorCode(); + err.println("[Hive Error]: " + errMsg); + } + } catch (TException e) { + String errMsg = e.getMessage(); + if (errMsg == null) { + errMsg = e.toString(); + } + ret = -10002; + err.println("[Thrift Error]: " + errMsg); + } finally { + try { + client.clean(); + } catch (TException e) { + String errMsg = e.getMessage(); + if (errMsg == null) { + errMsg = e.toString(); } + err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: " + + errMsg); } + } } else { // local mode CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf)conf); int tryCount = 0; @@ -284,32 +290,88 @@ public class CliDriver { } public int processLine(String line) { - int lastRet = 0, ret = 0; + return processLine(line, false); + } - String command = ""; - for (String oneCmd : line.split(";")) { + /** + * Processes a line of semicolon separated commands + * + * @param line + * The commands to process + * @param allowInterupting + * When true the function will handle SIG_INT (Ctrl+C) by interrupting the processing and + * returning -1 + * @return + */ + public int processLine(String line, boolean allowInterupting) { + SignalHandler oldSignal = null; + Signal interupSignal = null; + + if (allowInterupting) { + // Remember all threads that were running at the time we started line processing. + // Hook up the custom Ctrl+C handler while processing this line + interupSignal = new Signal("INT"); + oldSignal = Signal.handle(interupSignal, new SignalHandler() { + private final Thread cliThread = Thread.currentThread(); + private boolean interruptRequested; + + @Override + public void handle(Signal signal) { + boolean initialRequest = !interruptRequested; + interruptRequested = true; + + // Kill the VM on second ctrl+c + if (!initialRequest) { + console.printInfo("Exiting the JVM"); + System.exit(127); + } - if (StringUtils.endsWith(oneCmd, "\\")) { - command += StringUtils.chop(oneCmd) + ";"; - continue; - } else { - command += oneCmd; - } - if (StringUtils.isBlank(command)) { - continue; - } + // Interrupt the CLI thread to stop the current statement and return + // to prompt + console.printInfo("Interrupting... Be patient, this might take some time."); + console.printInfo("Press Ctrl+C again to kill JVM"); + + // First, kill any running MR jobs + HadoopJobExecHelper.killRunningJobs(); + HiveInterruptUtils.interrupt(); + this.cliThread.interrupt(); + } + }); + } - ret = processCmd(command); - command = ""; - lastRet = ret; - boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS); - if (ret != 0 && !ignoreErrors) { - CommandProcessorFactory.clean((HiveConf)conf); - return ret; + try { + int lastRet = 0, ret = 0; + + String command = ""; + for (String oneCmd : line.split(";")) { + + if (StringUtils.endsWith(oneCmd, "\\")) { + command += StringUtils.chop(oneCmd) + ";"; + continue; + } else { + command += oneCmd; + } + if (StringUtils.isBlank(command)) { + continue; + } + + ret = processCmd(command); + command = ""; + lastRet = ret; + boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS); + if (ret != 0 && !ignoreErrors) { + CommandProcessorFactory.clean((HiveConf) conf); + return ret; + } + } + CommandProcessorFactory.clean((HiveConf) conf); + return lastRet; + } finally { + // Once we are done processing the line, restore the old handler + if (oldSignal != null && interupSignal != null) { + Signal.handle(interupSignal, oldSignal); } } - CommandProcessorFactory.clean((HiveConf)conf); - return lastRet; } public int processReader(BufferedReader r) throws IOException { @@ -528,7 +590,7 @@ public class CliDriver { } if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) { line = prefix + line; - ret = cli.processLine(line); + ret = cli.processLine(line, true); prefix = ""; curPrompt = prompt; } else { Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java?rev=1104625&view=auto ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java (added) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java Tue May 17 23:21:20 2011 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +public interface HiveInterruptCallback { + /** + * Request interrupting of the processing + */ + void interrupt(); +} Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java?rev=1104625&view=auto ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java (added) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java Tue May 17 23:21:20 2011 @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import java.util.ArrayList; +import java.util.List; + +public class HiveInterruptUtils { + + /** + * A list of currently running comments that needs cleanup when the command is canceled + */ + private static List interruptCallbacks = new ArrayList(); + + public static HiveInterruptCallback add(HiveInterruptCallback command) { + synchronized (interruptCallbacks) { + interruptCallbacks.add(command); + } + return command; + } + + public static HiveInterruptCallback remove(HiveInterruptCallback command) { + synchronized (interruptCallbacks) { + interruptCallbacks.remove(command); + } + return command; + } + + /** + * Request interruption of current hive command + */ + public static void interrupt() { + synchronized (interruptCallbacks) { + for (HiveInterruptCallback resource : new ArrayList(interruptCallbacks)) { + resource.interrupt(); + } + } + } + + /** + * Checks if the current thread has been interrupted and throws RuntimeException is it has. + */ + public static void checkInterrupted() { + if (Thread.currentThread().isInterrupted()) { + InterruptedException interrupt = null; + try { + Thread.sleep(0); + } catch (InterruptedException e) { + interrupt = e; + } + throw new RuntimeException("Interuppted", interrupt); + } + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1104625&r1=1104624&r2=1104625&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Tue May 17 23:21:20 2011 @@ -158,29 +158,33 @@ public class HadoopJobExecHelper { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - synchronized (runningJobKillURIs) { - for (String uri : runningJobKillURIs.values()) { - try { - System.err.println("killing job with: " + uri); - java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri) - .openConnection(); - conn.setRequestMethod("POST"); - int retCode = conn.getResponseCode(); - if (retCode != 200) { - System.err.println("Got an error trying to kill job with URI: " + uri + " = " - + retCode); - } - } catch (Exception e) { - System.err.println("trying to kill job, caught: " + e); - // do nothing - } - } - } + killRunningJobs(); } }); } } - + + public static void killRunningJobs() { + synchronized (runningJobKillURIs) { + for (String uri : runningJobKillURIs.values()) { + try { + System.err.println("killing job with: " + uri); + java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri) + .openConnection(); + conn.setRequestMethod("POST"); + int retCode = conn.getResponseCode(); + if (retCode != 200) { + System.err.println("Got an error trying to kill job with URI: " + uri + " = " + + retCode); + } + } catch (Exception e) { + System.err.println("trying to kill job, caught: " + e); + // do nothing + } + } + } + } + public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { if (ctrs == null) { // hadoop might return null if it cannot locate the job. Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1104625&r1=1104624&r2=1104625&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue May 17 23:21:20 2011 @@ -85,6 +85,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.HiveInterruptCallback; +import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -1619,99 +1621,98 @@ public final class Utilities { // Process the case when name node call is needed final Map resultMap = new ConcurrentHashMap(); ArrayList> results = new ArrayList>(); - ThreadPoolExecutor executor = null; + final ThreadPoolExecutor executor; int maxThreads = ctx.getConf().getInt("mapred.dfsclient.parallelism.max", 0); if (pathNeedProcess.size() > 1 && maxThreads > 1) { int numExecutors = Math.min(pathNeedProcess.size(), maxThreads); LOG.info("Using " + numExecutors + " threads for getContentSummary"); executor = new ThreadPoolExecutor(numExecutors, numExecutors, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()); + } else { + executor = null; } - // - Configuration conf = ctx.getConf(); - JobConf jobConf = new JobConf(conf); - for (String path : pathNeedProcess) { - final Path p = new Path(path); - final String pathStr = path; - // All threads share the same Configuration and JobConf based on the - // assumption that they are thread safe if only read operations are - // executed. It is not stated in Hadoop's javadoc, the sourcce codes - // clearly showed that they made efforts for it and we believe it is - // thread safe. Will revisit this piece of codes if we find the assumption - // is not correct. - final Configuration myConf = conf; - final JobConf myJobConf = jobConf; - final PartitionDesc partDesc = work.getPathToPartitionInfo().get( - p.toString()); - Runnable r = new Runnable() { - public void run() { - try { - ContentSummary resultCs; - - Class inputFormatCls = partDesc - .getInputFileFormatClass(); - InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( - inputFormatCls, myJobConf); - if (inputFormatObj instanceof ContentSummaryInputFormat) { - resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p, - myJobConf); - } else { - FileSystem fs = p.getFileSystem(myConf); - resultCs = fs.getContentSummary(p); + HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { + @Override + public void interrupt() { + if (executor != null) { + executor.shutdownNow(); + } + } + }); + try { + Configuration conf = ctx.getConf(); + JobConf jobConf = new JobConf(conf); + for (String path : pathNeedProcess) { + final Path p = new Path(path); + final String pathStr = path; + // All threads share the same Configuration and JobConf based on the + // assumption that they are thread safe if only read operations are + // executed. It is not stated in Hadoop's javadoc, the sourcce codes + // clearly showed that they made efforts for it and we believe it is + // thread safe. Will revisit this piece of codes if we find the assumption + // is not correct. + final Configuration myConf = conf; + final JobConf myJobConf = jobConf; + final PartitionDesc partDesc = work.getPathToPartitionInfo().get( + p.toString()); + Runnable r = new Runnable() { + public void run() { + try { + ContentSummary resultCs; + + Class inputFormatCls = partDesc + .getInputFileFormatClass(); + InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( + inputFormatCls, myJobConf); + if (inputFormatObj instanceof ContentSummaryInputFormat) { + resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p, + myJobConf); + } else { + FileSystem fs = p.getFileSystem(myConf); + resultCs = fs.getContentSummary(p); + } + resultMap.put(pathStr, resultCs); + } catch (IOException e) { + // We safely ignore this exception for summary data. + // We don't update the cache to protect it from polluting other + // usages. The worst case is that IOException will always be + // retried for another getInputSummary(), which is fine as + // IOException is not considered as a common case. + LOG.info("Cannot get size of " + pathStr + ". Safely ignored."); } - resultMap.put(pathStr, resultCs); - } catch (IOException e) { - // We safely ignore this exception for summary data. - // We don't update the cache to protect it from polluting other - // usages. The worst case is that IOException will always be - // retried for another getInputSummary(), which is fine as - // IOException is not considered as a common case. - LOG.info("Cannot get size of " + pathStr + ". Safely ignored."); } - } - }; + }; - if (executor == null) { - r.run(); - } else { - Future result = executor.submit(r); - results.add(result); + if (executor == null) { + r.run(); + } else { + Future result = executor.submit(r); + results.add(result); + } } - } - if (executor != null) { - for (Future result : results) { - boolean executorDone = false; - do { - try { - result.get(); - executorDone = true; - } catch (InterruptedException e) { - LOG.info("Interrupted when waiting threads: ", e); - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - throw new IOException(e); - } - } while (!executorDone); + if (executor != null) { + executor.shutdown(); } - executor.shutdown(); - } + HiveInterruptUtils.checkInterrupted(); + for (Map.Entry entry : resultMap.entrySet()) { + ContentSummary cs = entry.getValue(); - for (Map.Entry entry : resultMap.entrySet()) { - ContentSummary cs = entry.getValue(); + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); + ctx.addCS(entry.getKey(), cs); + LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength() + + " file count: " + + cs.getFileCount() + " directory count: " + cs.getDirectoryCount()); + } - ctx.addCS(entry.getKey(), cs); - LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength() - + " file count: " - + cs.getFileCount() + " directory count: " + cs.getDirectoryCount()); + return new ContentSummary(summary[0], summary[1], summary[2]); + } finally { + HiveInterruptUtils.remove(interrup); } - - return new ContentSummary(summary[0], summary[1], summary[2]); } }