hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nzh...@apache.org
Subject svn commit: r1130427 - in /hive/trunk: conf/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ shims/src/0.20/java/org/apache/hadoop/hive/shims/ shims/src/0.20S/java/org/apache/hadoop/hive/shims/ shims/src/commo...
Date Thu, 02 Jun 2011 07:52:45 GMT
Author: nzhang
Date: Thu Jun  2 07:52:45 2011
New Revision: 1130427

URL: http://svn.apache.org/viewvc?rev=1130427&view=rev
Log:
HIVE-2154. add exception handling to hive's record reader (Yongqiang He via Ning Zhang)

Added:
    hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/
    hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java
    hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java
    hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerUtil.java
    hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java
Modified:
    hive/trunk/conf/hive-default.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
    hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java

Modified: hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml?rev=1130427&r1=1130426&r2=1130427&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml (original)
+++ hive/trunk/conf/hive-default.xml Thu Jun  2 07:52:45 2011
@@ -1118,4 +1118,12 @@
    of data sampled.</description>
 </property>
 
+<property>
+	<name>hive.io.exception.handlers</name>
+	<value></value>
+	<description>A list of io exception handler class names. This is used
+		to construct a list exception handlers to handle exceptions thrown 
+		by record readers</description>
+</property>
+
 </configuration>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java?rev=1130427&r1=1130426&r2=1130427&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java Thu
Jun  2 07:52:45 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.InputFormat;
@@ -40,10 +41,10 @@ public class BucketizedHiveRecordReader<
   protected final InputFormat inputFormat;
   protected final JobConf jobConf;
   protected final Reporter reporter;
-  protected RecordReader curReader;
+  protected RecordReader<K,V> curReader;
   protected long progress;
   protected int idx;
-
+  
   public BucketizedHiveRecordReader(InputFormat inputFormat,
       BucketizedHiveInputSplit bucketizedSplit, JobConf jobConf,
       Reporter reporter) throws IOException {
@@ -86,13 +87,21 @@ public class BucketizedHiveRecordReader<
   }
 
   public boolean doNext(K key, V value) throws IOException {
-    while ((curReader == null) || !curReader.next(key, value)) {
+    while ((curReader == null) || !doNextWithExceptionHandler(key, value)) {
       if (!initNextRecordReader()) {
         return false;
       }
     }
     return true;
   }
+  
+  private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
+    try {
+      return curReader.next(key, value);
+    } catch (Exception e) {
+      return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
+    }
+  }
 
   /**
    * Get the record reader for the next chunk in this
@@ -117,9 +126,11 @@ public class BucketizedHiveRecordReader<
       curReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf,
           reporter);
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jobConf);
     }
     idx++;
     return true;
   }
+
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1130427&r1=1130426&r2=1130427&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Jun  2 07:52:45
2011
@@ -33,6 +33,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandler;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -171,7 +174,7 @@ public class HiveInputFormat<K extends W
   }
 
   JobConf job;
-
+  
   public void configure(JobConf job) {
     this.job = job;
   }
@@ -204,7 +207,7 @@ public class HiveInputFormat<K extends W
       Reporter reporter) throws IOException {
 
     HiveInputSplit hsplit = (HiveInputSplit) split;
-
+    
     InputSplit inputSplit = hsplit.getInputSplit();
     String inputFormatClassName = null;
     Class inputFormatClass = null;
@@ -234,14 +237,19 @@ public class HiveInputFormat<K extends W
 
     InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
         cloneJobConf);
-    RecordReader innerReader = inputFormat.getRecordReader(inputSplit,
+    RecordReader innerReader = null;
+    try {
+      innerReader = inputFormat.getRecordReader(inputSplit,
         cloneJobConf, reporter);
-
-    HiveRecordReader<K,V> rr = new HiveRecordReader(innerReader);
+    } catch (Exception e) {
+      innerReader = HiveIOExceptionHandlerUtil
+          .handleRecordReaderCreationException(e, cloneJobConf);
+    }
+    HiveRecordReader<K,V> rr = new HiveRecordReader(innerReader, job);
     rr.initIOContext(hsplit, job, inputFormatClass, innerReader);
     return rr;
   }
-
+  
   protected Map<String, PartitionDesc> pathToPartitionInfo;
   MapredWork mrwork = null;
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java?rev=1130427&r1=1130426&r2=1130427&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java Thu Jun  2 07:52:45
2011
@@ -20,9 +20,11 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.ql.exec.ExecMapper;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 
 /**
@@ -33,10 +35,19 @@ public class HiveRecordReader<K extends 
     extends HiveContextAwareRecordReader<K, V> {
 
   private final RecordReader recordReader;
+  
+  private JobConf jobConf;
 
-  public HiveRecordReader(RecordReader recordReader) throws IOException {
+  public HiveRecordReader(RecordReader recordReader)
+      throws IOException {
     this.recordReader = recordReader;
   }
+  
+  public HiveRecordReader(RecordReader recordReader, JobConf conf)
+      throws IOException {
+    this.recordReader = recordReader;
+    this.jobConf = conf;
+  }
 
   public void doClose() throws IOException {
     recordReader.close();
@@ -63,7 +74,11 @@ public class HiveRecordReader<K extends 
     if (ExecMapper.getDone()) {
       return false;
     }
-    return recordReader.next(key, value);
+    try {
+      return recordReader.next(key, value);
+    } catch (Exception e) {
+      return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
+    }
   }
 
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java?rev=1130427&r1=1130426&r2=1130427&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java Thu Jun
 2 07:52:45 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.io.LongWritable;
@@ -105,9 +106,19 @@ public class SymlinkTextInputFormat exte
     // The target data is in TextInputFormat.
     TextInputFormat inputFormat = new TextInputFormat();
     inputFormat.configure(job);
-    return inputFormat.getRecordReader(targetSplit, job, reporter);
+    RecordReader innerReader = null;
+    try {
+      innerReader = inputFormat.getRecordReader(targetSplit, job,
+          reporter);
+    } catch (Exception e) {
+      innerReader = HiveIOExceptionHandlerUtil
+          .handleRecordReaderCreationException(e, job);
+    }
+    HiveRecordReader rr = new HiveRecordReader(innerReader, job);
+    rr.initIOContext((FileSplit)targetSplit, job, TextInputFormat.class, innerReader);
+    return rr;
   }
-
+  
   /**
    * Parses all target paths from job input directory which contains symlink
    * files, and splits the target data using TextInputFormat.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=1130427&r1=1130426&r2=1130427&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
Thu Jun  2 07:52:45 2011
@@ -396,9 +396,10 @@ public class BucketMapJoinOptimizer impl
           }
         } else {
           int jump = smallTblBucketNum / bigTblBucketNum;
+          List<String> bucketNames = aliasToBucketFileNamesMapping.get(alias);
           for (int i = index; i < aliasToBucketFileNamesMapping.get(alias).size(); i =
i + jump) {
             if(i <= aliasToBucketFileNamesMapping.get(alias).size()) {
-              resultFileNames.add(aliasToBucketFileNamesMapping.get(alias).get(i));
+              resultFileNames.add(bucketNames.get(i));
             }
           }
         }

Modified: hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1130427&r1=1130426&r2=1130427&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Thu Jun
 2 07:52:45 2011
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
@@ -209,10 +211,12 @@ public class Hadoop20Shims implements Ha
     protected RecordReader<K, V> curReader;
     protected boolean isShrinked;
     protected long shrinkedLength;
-
+    
     public boolean next(K key, V value) throws IOException {
 
-      while ((curReader == null) || !curReader.next((K)((CombineHiveKey)key).getKey(), value))
{
+      while ((curReader == null)
+          || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
+              value)) {
         if (!initNextRecordReader(key)) {
           return false;
         }
@@ -283,6 +287,21 @@ public class Hadoop20Shims implements Ha
       }
       initNextRecordReader(null);
     }
+    
+    /**
+     * do next and handle exception inside it. 
+     * @param key
+     * @param value
+     * @return
+     * @throws IOException
+     */
+    private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
+      try {
+        return curReader.next(key, value);
+      } catch (Exception e) {
+        return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jc);
+      }
+    }
 
     /**
      * Get the record reader for the next chunk in this CombineFileSplit.
@@ -318,7 +337,7 @@ public class Hadoop20Shims implements Ha
         jc.setLong("map.input.start", split.getOffset(idx));
         jc.setLong("map.input.length", split.getLength(idx));
       } catch (Exception e) {
-        throw new RuntimeException(e);
+        curReader=HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc);
       }
       idx++;
       return true;

Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1130427&r1=1130426&r2=1130427&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Jun
 2 07:52:45 2011
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.shims.Hadoop20Shims;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.Hadoop20Shims.InputSplitShim;
@@ -213,11 +215,13 @@ public class Hadoop20SShims implements H
     protected long progress;
     protected RecordReader<K, V> curReader;
     protected boolean isShrinked;
-    protected long shrinkedLength;    
+    protected long shrinkedLength;
 
     public boolean next(K key, V value) throws IOException {
 
-      while ((curReader == null) || !curReader.next((K)((CombineHiveKey)key).getKey(), value))
{
+      while ((curReader == null)
+          || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
+              value)) {
         if (!initNextRecordReader(key)) {
           return false;
         }
@@ -288,6 +292,22 @@ public class Hadoop20SShims implements H
       }
       initNextRecordReader(null);
     }
+    
+    /**
+     * do next and handle exception inside it. 
+     * @param key
+     * @param value
+     * @return
+     * @throws IOException
+     */
+    private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
+      try {
+        return curReader.next(key, value);
+      } catch (Exception e) {
+        return HiveIOExceptionHandlerUtil
+            .handleRecordReaderNextException(e, jc);
+      }
+    }
 
     /**
      * Get the record reader for the next chunk in this CombineFileSplit.
@@ -323,7 +343,8 @@ public class Hadoop20SShims implements H
         jc.setLong("map.input.start", split.getOffset(idx));
         jc.setLong("map.input.length", split.getLength(idx));
       } catch (Exception e) {
-        throw new RuntimeException(e);
+        curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(
+            e, jc);
       }
       idx++;
       return true;

Added: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java?rev=1130427&view=auto
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java
(added)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandler.java
Thu Jun  2 07:52:45 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * HiveIOExceptionHandler defines an interface that all io exception handler in
+ * Hive should implement. Different IO exception handlers can implement
+ * different logics based on the exception input into it.
+ */
+public interface HiveIOExceptionHandler {
+
+  /**
+   * process exceptions raised when creating a record reader.
+   * 
+   * @param e
+   * @return
+   */
+  public RecordReader<?, ?> handleRecordReaderCreationException(Exception e)
+      throws IOException;
+
+  /**
+   * process exceptions thrown when calling rr's next
+   * 
+   * @param e
+   * @param result
+   * @throws IOException
+   */
+  public void handleRecorReaderNextException(Exception e,
+      HiveIOExceptionNextHandleResult result) throws IOException;
+
+}

Added: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java?rev=1130427&view=auto
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java
(added)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerChain.java
Thu Jun  2 07:52:45 2011
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An exception handler chain that process the input exception by going through
+ * all exceptions defined in this chain one by one until either one exception
+ * handler returns true or it reaches the end of the chain. If it reaches the
+ * end of the chain, and still no exception handler returns true, throw the
+ * exception to the caller.
+ */
+public class HiveIOExceptionHandlerChain {
+  
+  public static String HIVE_IO_EXCEPTION_HANDLE_CHAIN = "hive.io.exception.handlers";
+
+  @SuppressWarnings("unchecked")
+  public static HiveIOExceptionHandlerChain getHiveIOExceptionHandlerChain(
+      JobConf conf) {
+    HiveIOExceptionHandlerChain chain = new HiveIOExceptionHandlerChain();
+    String exceptionHandlerStr = conf.get(HIVE_IO_EXCEPTION_HANDLE_CHAIN);
+    List<HiveIOExceptionHandler> handlerChain = new ArrayList<HiveIOExceptionHandler>();
+    if (exceptionHandlerStr != null && !exceptionHandlerStr.trim().equals("")) {
+      String[] handlerArr = exceptionHandlerStr.split(",");
+      if (handlerArr != null && handlerArr.length > 0) {
+        for (String handlerStr : handlerArr) {
+          if (!handlerStr.trim().equals("")) {
+            try {
+              Class<? extends HiveIOExceptionHandler> handlerCls = 
+                (Class<? extends HiveIOExceptionHandler>) Class.forName(handlerStr);
+              HiveIOExceptionHandler handler = ReflectionUtils.newInstance(handlerCls, null);
+              handlerChain.add(handler);
+            } catch (Exception e) {
+            }
+          }
+        }
+      }
+    }
+
+    chain.setHandlerChain(handlerChain);
+    return chain;
+  }
+
+  private List<HiveIOExceptionHandler> handlerChain;
+
+  /**
+   * @return the exception handler chain defined
+   */
+  protected List<HiveIOExceptionHandler> getHandlerChain() {
+    return handlerChain;
+  }
+
+  /**
+   * set the exception handler chain
+   * @param handlerChain
+   */
+  protected void setHandlerChain(List<HiveIOExceptionHandler> handlerChain) {
+    this.handlerChain = handlerChain;
+  }
+
+  public RecordReader<?,?>  handleRecordReaderCreationException(Exception e) throws
IOException {
+    RecordReader<?, ?> ret = null;
+
+    if (handlerChain != null && handlerChain.size() > 0) {
+      for (HiveIOExceptionHandler handler : handlerChain) {
+        ret = handler.handleRecordReaderCreationException(e);
+        if (ret != null) {
+          return ret;
+        }
+      }
+    }
+
+    //re-throw the exception as an IOException
+    throw new IOException(e);
+  }
+  
+  /**
+   * This is to handle exception when doing next operations. Here we use a
+   * HiveIOExceptionNextHandleResult to store the results of each handler. If
+   * the exception is handled by one handler, the handler should set
+   * HiveIOExceptionNextHandleResult to be handled, and also set the handle
+   * result. The handle result is used to return the reader's next to determine
+   * if need to open a new file for read or not.
+   */
+  public boolean handleRecordReaderNextException(Exception e)
+      throws IOException {
+    HiveIOExceptionNextHandleResult result = new HiveIOExceptionNextHandleResult();
+    if (handlerChain != null && handlerChain.size() > 0) {
+      for (HiveIOExceptionHandler handler : handlerChain) {
+        handler.handleRecorReaderNextException(e, result);
+        if (result.getHandled()) {
+          return result.getHandleResult();
+        }
+      }
+    }
+
+    //re-throw the exception as an IOException 
+    throw new IOException(e);
+  }
+
+}

Added: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerUtil.java?rev=1130427&view=auto
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerUtil.java
(added)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionHandlerUtil.java
Thu Jun  2 07:52:45 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class HiveIOExceptionHandlerUtil {
+
+  private static ThreadLocal<HiveIOExceptionHandlerChain> handlerChainInstance = 
+    new ThreadLocal<HiveIOExceptionHandlerChain>();
+
+  private static HiveIOExceptionHandlerChain get(JobConf job) {
+    HiveIOExceptionHandlerChain cache = HiveIOExceptionHandlerUtil.handlerChainInstance
+        .get();
+    if (cache == null) {
+      HiveIOExceptionHandlerChain toSet = HiveIOExceptionHandlerChain
+          .getHiveIOExceptionHandlerChain(job);
+      handlerChainInstance.set(toSet);
+      cache = HiveIOExceptionHandlerUtil.handlerChainInstance.get();
+    }
+    return cache;
+  }
+
+  /**
+   * Handle exception thrown when creating record reader. In case that there is
+   * an exception raised when construction the record reader and one handler can
+   * handle this exception, it should return an record reader, which is either a
+   * dummy empty record reader or a specific record reader that do some magic.
+   * 
+   * @param e
+   * @param job
+   * @return
+   * @throws IOException
+   */
+  public static RecordReader handleRecordReaderCreationException(Exception e,
+      JobConf job) throws IOException {
+    HiveIOExceptionHandlerChain ioExpectionHandlerChain = get(job);
+    if (ioExpectionHandlerChain != null) {
+      return ioExpectionHandlerChain.handleRecordReaderCreationException(e);
+    }
+    throw new IOException(e);
+  }
+
+  /**
+   * Handle exception thrown when calling record reader's next. If this
+   * exception is handled by one handler, will just return true. Otherwise,
+   * either re-throw this exception in one handler or at the end of the handler
+   * chain.
+   * 
+   * @param e
+   * @param job
+   * @return
+   * @throws IOException
+   */
+  public static boolean handleRecordReaderNextException(Exception e, JobConf job)
+      throws IOException {
+    HiveIOExceptionHandlerChain ioExpectionHandlerChain = get(job);
+    if (ioExpectionHandlerChain != null) {
+      return ioExpectionHandlerChain.handleRecordReaderNextException(e);
+    }
+    throw new IOException(e);
+  }
+
+}

Added: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java?rev=1130427&view=auto
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java
(added)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/io/HiveIOExceptionNextHandleResult.java
Thu Jun  2 07:52:45 2011
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.io;
+
+/**
+ * A container to store handling results for exceptions produced in record
+ * reader's next(). It basically contains 2 fields, one is to store if it is
+ * handled or not, another field to store the result.
+ */
+public class HiveIOExceptionNextHandleResult {
+
+  // this exception has been handled
+  private boolean handled;
+
+  //the handling results
+  private boolean handleResult;
+
+  public boolean getHandled() {
+    return handled;
+  }
+
+  public void setHandled(boolean handled) {
+    this.handled = handled;
+  }
+
+  public boolean getHandleResult() {
+    return handleResult;
+  }
+
+  public void setHandleResult(boolean handleResult) {
+    this.handleResult = handleResult;
+  }
+  
+  public void clear() {
+    handled = false;
+    handleResult = false;
+  }
+
+}



Mime
View raw message