pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dvrya...@apache.org
Subject svn commit: r956794 - in /hadoop/pig/trunk: ./ lib/ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/ src...
Date Tue, 22 Jun 2010 07:04:41 GMT
Author: dvryaboy
Date: Tue Jun 22 07:04:41 2010
New Revision: 956794

URL: http://svn.apache.org/viewvc?rev=956794&view=rev
Log:
PIG-1427: Monitor and kill runaway UDFs

Added:
    hadoop/pig/trunk/lib/guava-r03.jar   (with props)
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/build.xml
    hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=956794&r1=956793&r2=956794&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jun 22 07:04:41 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1427: Monitor and kill runaway UDFs (dvryaboy)
+
 PIG-1428: Make a StatusReporter singleton available for incrementing counters (dvryaboy)
 
 PIG-972: Make describe work with nested foreach (aniket486 via daijy)

Modified: hadoop/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=956794&r1=956793&r2=956794&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Tue Jun 22 07:04:41 2010
@@ -51,6 +51,7 @@
     <property name="hbase.jarfile" value="hbase-0.20.0.jar" />
     <property name="hbase.test.jarfile" value="hbase-0.20.0-test.jar" />
 	<property name="zookeeper.jarfile" value="zookeeper-hbase-1329.jar" />
+	<property name="guava.jarfile" value="guava-r03.jar" />
 	
     <!-- javac properties -->
     <property name="javac.debug" value="on" />
@@ -167,6 +168,7 @@
 	<path refid="compile.classpath"/>	
         <fileset file="${lib.dir}/${hadoop.jarfile}" />
         <fileset file="${lib.dir}/${hbase.jarfile}" />
+		<fileset file="${lib.dir}/${guava.jarfile}" />
         <fileset file="${lib.dir}/${hbase.test.jarfile}" />
     	<fileset file="${lib.dir}/${zookeeper.jarfile}"/>
     	<fileset file="${ivy.lib.dir}/jackson-mapper-asl-${jackson.version}.jar"/>
@@ -178,6 +180,7 @@
     <!-- javadoc-classpath -->
     <path id="javadoc-classpath">
         <fileset file="${lib.dir}/${hbase.jarfile}" />
+		<fileset file="${lib.dir}/${guava.jarfile}" />
         <fileset file="${lib.dir}/${hbase.test.jarfile}" />
 	<path refid="javadoc.classpath"/>	
     </path> 

Added: hadoop/pig/trunk/lib/guava-r03.jar
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib/guava-r03.jar?rev=956794&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/pig/trunk/lib/guava-r03.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml?rev=956794&r1=956793&r2=956794&view=diff
==============================================================================
--- hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml (original)
+++ hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml Tue Jun 22 07:04:41
2010
@@ -1229,7 +1229,6 @@ public class IntMax extends EvalFunc&lt;
 <p>One problem that users run into is when they make assumption about how many times
a constructor for their UDF is called. For instance, they might be creating side files in
the store function and doing it in the constructor seems like a good idea. The problem with
this approach is that in most cases Pig instantiates functions on the client side to, for
instance, examine the schema of the data.  </p>
 <p>Users should not make assumptions about how many times a function is instantiated;
instead, they should make their code resilient to multiple instantiations. For instance, they
could check if the files exist before creating them. </p>
 
-
 </section>
 
 <section>
@@ -1250,6 +1249,45 @@ public class IntMax extends EvalFunc&lt;
 <p>To store information, the UDF calls getUDFProperties. This returns a Properties
object which the UDF can record the information in or read the information from. To avoid
name space conflicts UDFs are required to provide a signature when obtaining a Properties
object. This can be done in two ways. The UDF can provide its Class object (via this.getClass()).
In this case, every instantiation of the UDF will be given the same Properties object. The
UDF can also provide its Class plus an array of Strings. The UDF can pass its constructor
arguments, or some other identifying strings. This allows each instantiation of the UDF to
have a different properties object thus avoiding name space collisions between instantiations
of the UDF.</p>
 </section>
 
+<section>
+<title>Monitoring long-running UDFs</title>
+<p>Sometimes one may discover that a UDF that executes very quickly in the vast majority
of cases turns out to run exceedingly slowly on occasion. This can happen, for example, if
a UDF uses complex regular expressions to parse free-form strings, or if a UDF uses some external
service to communicate with. As of version 0.8, Pig provides a facility for monitoring the
length of time a UDF is executing for every invocation, and terminating its execution if it
runs too long. This facility can be turned on using a simple Java annotation:</p
+	
+<source>
+	import org.apache.pig.builtin.MonitoredUDF;
+	
+	@MonitoredUDF
+	public class MyUDF extends EvalFunc&lt;Integer&gt; {
+	  /* implementation goes here */
+	}
+</source>
+
+<p>Simply annotating your UDF in this way will cause Pig to terminate the UDF's exec()
method if it runs for more than 10 seconds, and return the default value of null. The duration
of the timeout and the default value can be specified in the annotation, if desired:</p>
+
+<source>
+	import org.apache.pig.builtin.MonitoredUDF;
+	
+	@MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 100, intDefault = 10)
+	public class MyUDF extends EvalFunc&lt;Integer&gt; {
+	  /* implementation goes here */
+	}
+</source>
+
+<p>intDefault, longDefault, doubleDefault, floatDefault, and stringDefault can be specified
in the annotation; the correct default will be chosen based on the return type of the UDF.
Custom defaults for tuples and bags are not supported at this time.</p>
+
+<p>If desired, custom logic can also be implemented for error handling by creating
a subclass of MonitoredUDFExecutor.ErrorCallback, and overriding its handleError and/or handleTimeout
methods. Both of those methods are static, and are passed in the instance of the EvalFunc
that produced an exception, as well as an exception, so you may use any state you have in
the UDF to process the errors as desired. The default behavior is to increment Hadoop counters
every time an error is encountered. Once you have an implementation of the ErrorCallback that
performs your custom logic, you can provide it in the annotation:</p>
+
+<source>
+	import org.apache.pig.builtin.MonitoredUDF;
+
+	@MonitoredUDF(errorCallback=MySpecialErrorCallback.class)
+	public class MyUDF extends EvalFunc&lt;Integer&gt; {
+	  /* implementation goes here */
+	}
+</source>
+
+<p>Currently the MonitoredUDF annotation works with regular and Algebraic UDFs, but
has no effect on UDFs that run in the Accumulator mode.</p>
+
 </section>
 
 </body>

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=956794&r1=956793&r2=956794&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Tue Jun 22 07:04:41 2010
@@ -36,6 +36,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
+import org.apache.pig.builtin.MonitoredUDF;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -62,6 +64,9 @@ public class POUserFunc extends Expressi
     public static final byte INTERMEDIATE = 1;
     public static final byte FINAL = 2;
     private boolean initialized = false;
+    private MonitoredUDFExecutor executor = null;
+    
+    
 
     public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
         super(k, rp);
@@ -93,6 +98,9 @@ public class POUserFunc extends Expressi
 
     private void instantiateFunc(FuncSpec fSpec) {
         this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
+        if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
+            executor = new MonitoredUDFExecutor(func);
+        }
         //the next couple of initializations do not work as intended for the following reasons
         //the reporter and pigLogger are member variables of PhysicalOperator
         //when instanitateFunc is invoked at deserialization time, both
@@ -205,8 +213,12 @@ public class POUserFunc extends Expressi
                         result.result = ((Accumulator)func).getValue();	
                         ((Accumulator)func).cleanup();
                     }
-                } else {					
+                } else {
+                    if (executor != null) {
+                        result.result = executor.monitorExec((Tuple) result.result);
+                    } else {
                     result.result = func.exec((Tuple) result.result);
+                    }
                 }
                 if(resultType == DataType.BYTEARRAY) {
                     if(res.result != null && DataType.findType(result.result) !=
DataType.BYTEARRAY) {
@@ -369,6 +381,9 @@ public class POUserFunc extends Expressi
 
     public void finish() {
         func.finish();
+        if (executor != null) {
+            executor.terminate();
+        }
     }
 
     public Schema outputSchema(Schema input) {

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java?rev=956794&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
(added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
Tue Jun 22 07:04:41 2010
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.util;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * MonitoredUDF is used to watch execution of a UDF, and kill it if the UDF takes an
+ * exceedingly long time. Null is returned if the UDF times out.
+ * 
+ * Optionally, UDFs can implement the provided interfaces to provide custom logic for 
+ * handling errors and default values.
+ * 
+ */
+public class MonitoredUDFExecutor {
+
+    private final ExecutorService exec;
+    private final TimeUnit timeUnit;
+    private final long duration;
+    private final Object defaultValue;
+    @SuppressWarnings("unchecked")
+    private final EvalFunc evalFunc;
+    private final Function<Tuple, Object> closure;
+
+    // Let us reflect upon our errors.
+    private final Class<? extends ErrorCallback> errorCallback;
+    private final Method errorHandler;
+    private final Method timeoutHandler;
+
+    @SuppressWarnings("unchecked")
+    public MonitoredUDFExecutor(EvalFunc udf) {
+        // is 10 enough? This is pretty arbitrary.
+        exec = MoreExecutors.getExitingExecutorService(new ScheduledThreadPoolExecutor(10));
+        this.evalFunc = udf;
+        MonitoredUDF anno = udf.getClass().getAnnotation(MonitoredUDF.class);
+        timeUnit = anno.timeUnit();
+        duration = anno.duration();
+        errorCallback = anno.errorCallback();
+
+        // The exceptions really should not happen since our handlers are defined by the
parent class which 
+        // must be extended by all custom handlers.
+        try {
+            errorHandler = errorCallback.getMethod("handleError", EvalFunc.class, Exception.class);
+            timeoutHandler = errorCallback.getMethod("handleTimeout", EvalFunc.class, Exception.class);
+        } catch (SecurityException e1) {
+            throw new RuntimeException("Unable to use the monitored callback due to a Security
Exception while working with "
+                    + evalFunc.getClass().getName());
+        } catch (NoSuchMethodException e1) {
+            throw new RuntimeException("Unable to use the monitored callback because a required
method not found while working with "
+                    + evalFunc.getClass().getName());
+        }
+
+        Type retType = udf.getReturnType();
+        defaultValue = getDefaultValue(anno, retType);
+        closure = new Function<Tuple, Object>() {
+            public Object apply(Tuple input) {
+                try {
+                    return evalFunc.exec(input);
+                } catch (IOException e) {
+                    // I don't see a CheckedFunction in Guava. Resorting to this hackery.
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+    }
+
+    private Object getDefaultValue(MonitoredUDF anno, Type retType) {
+        if (retType.equals(Integer.TYPE) || retType.equals(Integer.class)) {
+            return (anno.intDefault().length == 0) ? null : anno.intDefault()[0];
+        } else if (retType.equals(Double.TYPE) || retType.equals(Double.class)) {
+            return (anno.doubleDefault().length == 0) ? null : anno.doubleDefault()[0];
+        } else if (retType.equals(Float.TYPE) || retType.equals(Float.class)) {
+            return (anno.floatDefault().length == 0) ? null : anno.floatDefault()[0];
+        } else if (retType.equals(Long.TYPE) || retType.equals(Long.class)) {
+            return (anno.longDefault().length == 0) ? null : anno.longDefault()[0];
+        } else if (retType.equals(String.class)) {
+            return (anno.stringDefault().length == 0) ? null : anno.stringDefault()[0];
+        } else {
+            // Default default is null.
+            return null;
+        }
+    }
+
+    /**
+     * This method *MUST* be called in the finish by POUserFunc.
+     * Though we do use an ExitingExecutorService just in case.
+     */
+    public void terminate() {
+        exec.shutdownNow();
+    }
+
+    /**
+     * UDF authors can optionally extend this class and provide the class of their custom
callbacks in the annotation
+     * to perform their own handling of errors and timeouts.
+     */
+
+    public static class ErrorCallback {
+
+        @SuppressWarnings("unchecked")
+        public static void handleError(EvalFunc evalFunc, Exception e) {
+            evalFunc.getLogger().error(e);
+            StatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null && 
+                    reporter.getCounter(evalFunc.getClass().getName(), e.toString()) != null)
{
+                reporter.getCounter(evalFunc.getClass().getName(), e.toString()).increment(1L);
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        public static void handleTimeout(EvalFunc evalFunc, Exception e) {
+            evalFunc.getLogger().error(e);
+            StatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null && 
+                    reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout")
!= null) {
+                reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout").increment(1L);
+            }
+        }
+    }
+
+    public Object monitorExec(final Tuple input) throws IOException {
+        CheckedFuture<Object, Exception> f = 
+            Futures.makeChecked(
+                    // the Future whose exceptions we want to catch
+                    exec.submit(new Callable<Object>() {
+                        @Override
+                        public Object call() throws Exception {
+                            return closure.apply(input);
+                        }
+                    }), 
+                    // How to map those exceptions; we simply rethrow them.
+                    // Theoretically we could do some handling of 
+                    // CancellationException, ExecutionException  and InterruptedException
here
+                    // and do something special for UDF IOExceptions as opposed to thread
exceptions.
+                    new Function<Exception, Exception>() { 
+                        public Exception apply(Exception e) { 
+                            return e; 
+                        } 
+                    });
+
+        Object result = defaultValue;
+        
+        // The outer try "should never happen" (tm).
+        try {
+            try {
+                result = f.get(duration, timeUnit);
+            } catch (TimeoutException e) {
+                timeoutHandler.invoke(null, evalFunc, e);
+            } catch (Exception e) {
+                errorHandler.invoke(null, evalFunc, e);
+            } finally {
+                f.cancel(true);
+            }
+        } catch (IllegalArgumentException e) {
+            throw new IOException(e);
+        } catch (IllegalAccessException e) {
+            throw new IOException(e);
+        } catch (InvocationTargetException e) {
+            throw new IOException(e);
+        }
+        return result;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java?rev=956794&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java Tue Jun 22 07:04:41 2010
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor.ErrorCallback;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+
+/**
+ * Describes how the execution of a UDF should be monitored, and what 
+ * to do if it times out.
+ * <p>
+ * NOTE: does not work with UDFs that implement the Accumulator interface
+ * <p>
+ *     
+ * Setting a default value will cause it to be used instead of null when the UDF times out.
+ * The appropriate value among in, long, string, etc, is used.
+ * The default fields of these annotations are arrays for Java reasons I won't bore you with.
+ * <p>
+ * Set them as if they were primitives: <code>@MonitoredUDF( intDefault=5 )</code>
+ * <p>
+ * There is currently no way to provide a default ByteArray, Tuple, Map, or Bag. Null will
always be used for those.
+ * <p>
+ * Currently this annotation is ignored when the Accumulator interface is used.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+@Documented
+@Retention(value=RetentionPolicy.RUNTIME)
+public @interface MonitoredUDF {
+    /**
+     * Time Units in which to measure timeout value.
+     * @return Time Units in which to measure timeout value.
+     */
+    TimeUnit timeUnit() default TimeUnit.SECONDS;
+    
+    /**
+     * Number of time units after which the execution should be halted and default returned.
+     * @return Number of time units after which the execution should be halted and default
returned.
+     */
+    int duration() default 10;
+    
+    int[] intDefault() default {};
+    long[] longDefault() default {};
+    double[] doubleDefault() default {};
+    float[] floatDefault() default {};
+    String[] stringDefault() default {};
+  
+    /**
+     * UDF author can implement a static extension of MonitoredUDFExecutor.ErrorCallback
and provide its class
+     * to the annotation in order to perform custom error handling.
+     * @return
+     */
+    Class<? extends ErrorCallback> errorCallback() default MonitoredUDFExecutor.ErrorCallback.class;
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java?rev=956794&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java Tue Jun 22 07:04:41 2010
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor.ErrorCallback;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+public class TestMonitoredUDF {
+
+    @Test
+    public void testTimeout() throws IOException {
+
+        SimpleUDF udf = new SimpleUDF(1000);
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(udf);
+        assertNull(exec.monitorExec(null));
+    }
+
+    @Test
+    public void testTimeoutWithDefault() throws IOException {
+
+        SimpleIntUDF udf = new SimpleIntUDF();
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(udf);
+        assertEquals( SimpleIntUDF.DEFAULT, ((Integer) exec.monitorExec(null)).intValue());
+    }
+
+    @Test
+    public void testCustomErrorHandler() throws IOException {
+
+        ErrorCallbackUDF udf = new ErrorCallbackUDF();
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(udf);
+        exec.monitorExec(null);
+        assertTrue(thereWasATimeout);
+    }
+
+    @Test
+    public void testNoTimeout() throws IOException {
+        SimpleUDF udf = new SimpleUDF(100);
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(udf);
+        assertTrue((Boolean) exec.monitorExec(null));
+    }
+
+    public static void main(String[] args) throws IOException {
+        long startTime = System.currentTimeMillis();
+        long unmonitoredTime = 0, monitoredTime = 0;
+        
+        int[] numReps = { 1000, 10000, 100000, 1000000};
+        MonitoredNoOpUDF monitoredUdf  = new MonitoredNoOpUDF();
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(monitoredUdf);
+        UnmonitoredNoOpUDF unmonitoredUdf = new UnmonitoredNoOpUDF();
+        // warm up
+        System.out.println("Warming up.");
+        for (int i : numReps) {
+            for (int j=0; j < i; j++) {
+                exec.monitorExec(null);
+                unmonitoredUdf.exec(null);
+            }
+        }
+        System.out.println("Warmed up. Timing.");
+        // tests!
+        for (int k = 0; k < 5; k++) {
+            for (int i : numReps) {
+                startTime = System.currentTimeMillis();
+                for (int j = 0; j < i; j++) {
+                    exec.monitorExec(null);
+                }
+                monitoredTime = System.currentTimeMillis() - startTime;
+                startTime = System.currentTimeMillis();
+                for (int j = 0; j < i; j++) {
+                    unmonitoredUdf.exec(null);
+                }
+                unmonitoredTime = System.currentTimeMillis() - startTime;
+                System.out.println("Reps: " + i + " monitored: " + monitoredTime + " unmonitored:
" + unmonitoredTime);
+            }    
+        }
+    }
+
+    @MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 500)
+    public class SimpleUDF extends EvalFunc<Boolean> {
+        int wait;
+        public SimpleUDF(int wait) {
+            this.wait = wait;
+        }
+
+        @Override
+        public Boolean exec(Tuple input) throws IOException {
+            try {
+                Thread.sleep(wait);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            return true;
+        }
+    }
+    
+    public static class UnmonitoredNoOpUDF extends EvalFunc<Boolean> {
+        @Override public Boolean exec(Tuple input) throws IOException { 
+            System.currentTimeMillis(); return true; }
+    }
+
+    @MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 500)
+    public static class MonitoredNoOpUDF extends EvalFunc<Boolean> {
+        @Override public Boolean exec(Tuple input) throws IOException { 
+            System.currentTimeMillis(); return true; }
+    }
+
+    
+    @MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 100, intDefault = SimpleIntUDF.DEFAULT)
+    public class SimpleIntUDF extends EvalFunc<Integer> {
+        public static final int DEFAULT = 123;
+        public static final int NOT_DEFAULT = 321;
+
+        @Override
+        public Integer exec(Tuple input) throws IOException {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            return  NOT_DEFAULT;
+        }
+    }
+
+    @MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 100, errorCallback = CustomErrorCallback.class)
+    public class ErrorCallbackUDF extends EvalFunc<Integer> {
+
+        @Override
+        public Integer exec(Tuple input) throws IOException {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            return null;
+        }
+
+    }
+
+    static boolean thereWasATimeout = false;
+
+    public static class CustomErrorCallback extends ErrorCallback {
+
+        @SuppressWarnings("unchecked")
+        public static void handleTimeout(EvalFunc evalFunc, Exception e) {
+            thereWasATimeout = true;
+        }
+    }
+}



Mime
View raw message