hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1535944 - /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
Date Sat, 26 Oct 2013 08:10:28 GMT
Author: gunther
Date: Sat Oct 26 08:10:28 2013
New Revision: 1535944

URL: http://svn.apache.org/r1535944
Log:
HIVE-5639: Allow caching of Orc footers in Tez AM (addendum) (Siddarth Seth via Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1535944&r1=1535943&r2=1535944&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Sat
Oct 26 08:10:28 2013
@@ -333,6 +333,7 @@ public class OrcInputFormat  implements 
     private final boolean cacheStripeDetails;
     private final AtomicInteger cacheHitCounter = new AtomicInteger(0);
     private final AtomicInteger numFilesCounter = new AtomicInteger(0);
+    private Throwable fatalError = null;
 
     /**
      * A count of the number of threads that may create more work for the
@@ -388,10 +389,14 @@ public class OrcInputFormat  implements 
      * @param runnable the object to run
      */
     synchronized void schedule(Runnable runnable) {
-      if (runnable instanceof FileGenerator) {
-        schedulers += 1;
+      if (fatalError == null) {
+        if (runnable instanceof FileGenerator || runnable instanceof SplitGenerator) {
+          schedulers += 1;
+        }
+        threadPool.execute(runnable);
+      } else {
+        throw new RuntimeException("serious problem", fatalError);
       }
-      threadPool.execute(runnable);
     }
 
     /**
@@ -404,6 +409,11 @@ public class OrcInputFormat  implements 
       }
     }
 
+    synchronized void notifyOnNonIOException(Throwable th) {
+      fatalError = th;
+      notify();
+    }
+
     /**
      * Wait until all of the tasks are done. It waits until all of the
      * threads that may create more work are done and then shuts down the
@@ -413,6 +423,10 @@ public class OrcInputFormat  implements 
       try {
         while (schedulers != 0) {
           wait();
+          if (fatalError != null) {
+            threadPool.shutdownNow();
+            throw new RuntimeException("serious problem", fatalError);
+          }
         }
         threadPool.shutdown();
         threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
@@ -457,13 +471,19 @@ public class OrcInputFormat  implements 
           }
         }
         // mark the fact that we are done
-        context.decrementSchedulers();
         perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_BLOCK_LOCATIONS);
       } catch (Throwable th) {
-        context.decrementSchedulers();
+        if (!(th instanceof IOException)) {
+          LOG.error("Unexpected Exception", th);
+        }
         synchronized (context.errors) {
           context.errors.add(th);
         }
+        if (!(th instanceof IOException)) {
+          context.notifyOnNonIOException(th);
+        }
+      } finally {
+        context.decrementSchedulers();
       }
     }
 
@@ -643,9 +663,17 @@ public class OrcInputFormat  implements 
           createSplit(currentOffset, currentLength, fileMetaInfo);
         }
       } catch (Throwable th) {
+        if (!(th instanceof IOException)) {
+          LOG.error("Unexpected Exception", th);
+        }
         synchronized (context.errors) {
           context.errors.add(th);
         }
+        if (!(th instanceof IOException)) {
+          context.notifyOnNonIOException(th);
+        }
+      } finally {
+        context.decrementSchedulers();
       }
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CREATE_ORC_SPLITS);
     }
@@ -678,9 +706,15 @@ public class OrcInputFormat  implements 
           }
         }
       } catch (Throwable th) {
+        if (!(th instanceof IOException)) {
+          LOG.error("Unexpected Exception", th);
+        }
         synchronized (context.errors) {
           context.errors.add(th);
         }
+        if (!(th instanceof IOException)) {
+          context.notifyOnNonIOException(th);
+        }
       }
     }
 
@@ -705,7 +739,7 @@ public class OrcInputFormat  implements 
         if (th instanceof IOException) {
           errors.add((IOException) th);
         } else {
-          throw new IOException("serious problem", th);
+          throw new RuntimeException("serious problem", th);
         }
       }
       throw new InvalidInputException(errors);



Mime
View raw message