Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 415D1173C3 for ; Tue, 3 Mar 2015 22:36:00 +0000 (UTC) Received: (qmail 5601 invoked by uid 500); 3 Mar 2015 22:35:47 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 5558 invoked by uid 500); 3 Mar 2015 22:35:47 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 5547 invoked by uid 99); 3 Mar 2015 22:35:47 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 22:35:47 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id E3EBDAC0044 for ; Tue, 3 Mar 2015 22:35:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1663817 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Date: Tue, 03 Mar 2015 22:35:46 -0000 To: commits@hive.apache.org From: xuefu@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150303223546.E3EBDAC0044@hades.apache.org> Author: xuefu Date: Tue Mar 3 22:35:46 2015 New Revision: 1663817 URL: http://svn.apache.org/r1663817 Log: HIVE-9830: Map join could dump a small table multiple times [Spark Branch] (Jimmy via Xuefu) Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1663817&r1=1663816&r2=1663817&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Tue Mar 3 22:35:46 2015 @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec; import java.io.BufferedOutputStream; -import java.io.IOException; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; @@ -81,8 +80,18 @@ public class SparkHashTableSinkOperator if (mapJoinTables == null || mapJoinTables.length < tag || mapJoinTables[tag] == null) { LOG.debug("mapJoinTable is null"); + } else if (abort) { + if (LOG.isDebugEnabled()) { + LOG.debug("Aborting, skip dumping side-table for tag: " + tag); + } } else { - flushToFile(mapJoinTables[tag], tag); + String method = PerfLogger.SPARK_FLUSH_HASHTABLE + getName(); + perfLogger.PerfLogBegin(CLASS_NAME, method); + try { + flushToFile(mapJoinTables[tag], tag); + } finally { + perfLogger.PerfLogEnd(CLASS_NAME, method); + } } super.closeOp(abort); } catch (HiveException e) { @@ -93,8 +102,7 @@ public class SparkHashTableSinkOperator } protected void flushToFile(MapJoinPersistableTableContainer tableContainer, - byte tag) throws IOException, HiveException { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName()); + byte tag) throws Exception { MapredLocalWork localWork = getExecContext().getLocalWork(); BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext(); Path inputPath = getExecContext().getCurrentInputPath(); @@ -136,7 +144,6 @@ public class SparkHashTableSinkOperator htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag + " with group count: " + tableContainer.size() + " into file: " + path); // get the hashtable file and path - // get the hashtable file and path OutputStream os = null; ObjectOutputStream out = null; try { @@ -144,6 +151,18 @@ public class SparkHashTableSinkOperator out = new ObjectOutputStream(new BufferedOutputStream(os, 4096)); MapJoinTableContainerSerDe mapJoinTableSerde = htsOperator.mapJoinTableSerdes[tag]; mapJoinTableSerde.persist(out, tableContainer); + FileStatus status = fs.getFileStatus(path); + htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path + + " (" + status.getLen() + " bytes)"); + } catch (Exception e) { + // Failed to dump the side-table, remove the partial file + try { + fs.delete(path, false); + } catch (Exception ex) { + LOG.warn("Got exception in deleting partial side-table dump for tag: " + + tag + ", file " + path, ex); + } + throw e; } finally { if (out != null) { out.close(); @@ -152,10 +171,6 @@ public class SparkHashTableSinkOperator } } tableContainer.clear(); - FileStatus status = fs.getFileStatus(path); - htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path - + " (" + status.getLen() + " bytes)"); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName()); } public void setTag(byte tag) {