hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sze...@apache.org
Subject [06/44] hive git commit: HIVE-9830: Map join could dump a small table multiple times [Spark Branch] (Jimmy via Xuefu)
Date Thu, 23 Apr 2015 02:34:15 GMT
HIVE-9830: Map join could dump a small table multiple times [Spark Branch] (Jimmy via Xuefu)

git-svn-id: https://svn.apache.org/repos/asf/hive/branches/spark@1663817 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a5b38fb9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a5b38fb9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a5b38fb9

Branch: refs/heads/master
Commit: a5b38fb98f95c35ae80e1bc574cd78cbcb7bf348
Parents: 347b845
Author: Xuefu Zhang <xuefu@apache.org>
Authored: Tue Mar 3 22:35:46 2015 +0000
Committer: Szehon Ho <szehon@cloudera.com>
Committed: Wed Apr 22 19:33:49 2015 -0700

----------------------------------------------------------------------
 .../ql/exec/SparkHashTableSinkOperator.java     | 33 ++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a5b38fb9/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
index 39ffda5..c3ba21a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.BufferedOutputStream;
-import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -85,8 +84,18 @@ public class SparkHashTableSinkOperator
       if (mapJoinTables == null || mapJoinTables.length < tag
           || mapJoinTables[tag] == null) {
         LOG.debug("mapJoinTable is null");
+      } else if (abort) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Aborting, skip dumping side-table for tag: " + tag);
+        }
       } else {
-        flushToFile(mapJoinTables[tag], tag);
+        String method = PerfLogger.SPARK_FLUSH_HASHTABLE + getName();
+        perfLogger.PerfLogBegin(CLASS_NAME, method);
+        try {
+          flushToFile(mapJoinTables[tag], tag);
+        } finally {
+          perfLogger.PerfLogEnd(CLASS_NAME, method);
+        }
       }
       super.closeOp(abort);
     } catch (HiveException e) {
@@ -97,8 +106,7 @@ public class SparkHashTableSinkOperator
   }
 
   protected void flushToFile(MapJoinPersistableTableContainer tableContainer,
-      byte tag) throws IOException, HiveException {
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
+      byte tag) throws Exception {
     MapredLocalWork localWork = getExecContext().getLocalWork();
     BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
     Path inputPath = getExecContext().getCurrentInputPath();
@@ -140,7 +148,6 @@ public class SparkHashTableSinkOperator
     htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag
       + " with group count: " + tableContainer.size() + " into file: " + path);
     // get the hashtable file and path
-    // get the hashtable file and path
     OutputStream os = null;
     ObjectOutputStream out = null;
     try {
@@ -148,6 +155,18 @@ public class SparkHashTableSinkOperator
       out = new ObjectOutputStream(new BufferedOutputStream(os, 4096));
       MapJoinTableContainerSerDe mapJoinTableSerde = htsOperator.mapJoinTableSerdes[tag];
       mapJoinTableSerde.persist(out, tableContainer);
+      FileStatus status = fs.getFileStatus(path);
+      htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path
+        + " (" + status.getLen() + " bytes)");
+    } catch (Exception e) {
+      // Failed to dump the side-table, remove the partial file
+      try {
+        fs.delete(path, false);
+      } catch (Exception ex) {
+        LOG.warn("Got exception in deleting partial side-table dump for tag: "
+          + tag + ", file " + path, ex);
+      }
+      throw e;
     } finally {
       if (out != null) {
         out.close();
@@ -156,10 +175,6 @@ public class SparkHashTableSinkOperator
       }
     }
     tableContainer.clear();
-    FileStatus status = fs.getFileStatus(path);
-    htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path
-      + " (" + status.getLen() + " bytes)");
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
   }
 
   public void setTag(byte tag) {


Mime
View raw message