hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r776883 - in /hadoop/hive/trunk: CHANGES.txt build-common.xml ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
Date Wed, 20 May 2009 22:27:22 GMT
Author: namit
Date: Wed May 20 22:27:21 2009
New Revision: 776883

URL: http://svn.apache.org/viewvc?rev=776883&view=rev
Log:
HIVE-410: Heartbeating for streaming jobs should not depend on stdout


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/build-common.xml
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=776883&r1=776882&r2=776883&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed May 20 22:27:21 2009
@@ -143,6 +143,8 @@
     HIVE-498. Fix null pointer exception in UDFRegExp when
     the expression string is empty. (Zheng Shao via athusoo)
 
+    HIVE-410: Heartbeating for streaming jobs should not depend on stdout
+
 Release 0.3.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build-common.xml?rev=776883&r1=776882&r2=776883&view=diff
==============================================================================
--- hadoop/hive/trunk/build-common.xml (original)
+++ hadoop/hive/trunk/build-common.xml Wed May 20 22:27:21 2009
@@ -242,7 +242,7 @@
   <target name="test"
   	depends="test-conditions,gen-test,compile-test,test-jar,test-init">
     <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
-           fork="yes" maxmemory="256m" dir="${basedir}" timeout="${test.timeout}"
+           fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}"
            errorProperty="tests.failed" failureProperty="tests.failed" filtertrace="off">
       <!--
       <jvmarg value="-Xdebug"/>

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=776883&r1=776882&r2=776883&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Wed May
20 22:27:21 2009
@@ -61,6 +61,10 @@
   transient volatile Throwable scriptError = null;
 
   /**
+   * Timer to send periodic reports back to the tracker.
+   */
+  transient Timer rpTimer;
+  /**
    * addJobConfToEnvironment is shamelessly copied from hadoop streaming.
    */
   static String safeEnvVarName(String var) {
@@ -214,6 +218,22 @@
                                    (HiveConf.getIntVar(hconf, HiveConf.ConfVars.SCRIPTERRORLIMIT)),
                                    "ErrorProcessor");
       errThread.start();
+      
+      /* Timer that reports every 5 minutes to the jobtracker. This ensures that even if
+         the user script is not returning rows for greater than that duration, a progress
+         report is sent to the tracker so that the tracker does not think that the job 
+         is dead.
+      */
+      Integer exp_interval = null;
+      int exp_int;
+      exp_interval = Integer.decode(hconf.get("mapred.tasktracker.expiry.interval"));
+      if (exp_interval != null)
+        exp_int = exp_interval.intValue() / 2;
+      else
+        exp_int = 300000;
+
+      rpTimer = new Timer(true);
+      rpTimer.scheduleAtFixedRate(new ReporterTask(reporter), 0, exp_interval);
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -256,6 +276,7 @@
           new_abort = true;
         };
       } catch (IOException e) {
+        LOG.error("Got ioexception: " + e.getMessage());
         new_abort = true;
       } catch (InterruptedException e) { }
     }
@@ -443,4 +464,25 @@
   public String getName() {
     return "SCR";
   }
+  
+  class ReporterTask extends TimerTask {
+    
+    /**
+     * Reporter to report progress to the jobtracker.
+     */
+    private Reporter rp;
+    
+    /**
+     * Constructor.
+     */
+    public ReporterTask(Reporter rp) {
+      if (rp != null)
+        this.rp = rp;
+    }
+    
+    @Override
+    public void run() {
+      rp.progress();
+    }
+  }
 }



Mime
View raw message