hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1663817 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
Date Tue, 03 Mar 2015 22:35:46 GMT
Author: xuefu
Date: Tue Mar  3 22:35:46 2015
New Revision: 1663817

URL: http://svn.apache.org/r1663817
Log:
HIVE-9830: Map join could dump a small table multiple times [Spark Branch] (Jimmy via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1663817&r1=1663816&r2=1663817&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
Tue Mar  3 22:35:46 2015
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.BufferedOutputStream;
-import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -81,8 +80,18 @@ public class SparkHashTableSinkOperator
       if (mapJoinTables == null || mapJoinTables.length < tag
           || mapJoinTables[tag] == null) {
         LOG.debug("mapJoinTable is null");
+      } else if (abort) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Aborting, skip dumping side-table for tag: " + tag);
+        }
       } else {
-        flushToFile(mapJoinTables[tag], tag);
+        String method = PerfLogger.SPARK_FLUSH_HASHTABLE + getName();
+        perfLogger.PerfLogBegin(CLASS_NAME, method);
+        try {
+          flushToFile(mapJoinTables[tag], tag);
+        } finally {
+          perfLogger.PerfLogEnd(CLASS_NAME, method);
+        }
       }
       super.closeOp(abort);
     } catch (HiveException e) {
@@ -93,8 +102,7 @@ public class SparkHashTableSinkOperator
   }
 
   protected void flushToFile(MapJoinPersistableTableContainer tableContainer,
-      byte tag) throws IOException, HiveException {
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
+      byte tag) throws Exception {
     MapredLocalWork localWork = getExecContext().getLocalWork();
     BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
     Path inputPath = getExecContext().getCurrentInputPath();
@@ -136,7 +144,6 @@ public class SparkHashTableSinkOperator
     htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag
       + " with group count: " + tableContainer.size() + " into file: " + path);
     // get the hashtable file and path
-    // get the hashtable file and path
     OutputStream os = null;
     ObjectOutputStream out = null;
     try {
@@ -144,6 +151,18 @@ public class SparkHashTableSinkOperator
       out = new ObjectOutputStream(new BufferedOutputStream(os, 4096));
       MapJoinTableContainerSerDe mapJoinTableSerde = htsOperator.mapJoinTableSerdes[tag];
       mapJoinTableSerde.persist(out, tableContainer);
+      FileStatus status = fs.getFileStatus(path);
+      htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path
+        + " (" + status.getLen() + " bytes)");
+    } catch (Exception e) {
+      // Failed to dump the side-table, remove the partial file
+      try {
+        fs.delete(path, false);
+      } catch (Exception ex) {
+        LOG.warn("Got exception in deleting partial side-table dump for tag: "
+          + tag + ", file " + path, ex);
+      }
+      throw e;
     } finally {
       if (out != null) {
         out.close();
@@ -152,10 +171,6 @@ public class SparkHashTableSinkOperator
       }
     }
     tableContainer.clear();
-    FileStatus status = fs.getFileStatus(path);
-    htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path
-      + " (" + status.getLen() + " bytes)");
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
   }
 
   public void setTag(byte tag) {



Mime
View raw message