pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From knogu...@apache.org
Subject svn commit: r1733355 - in /pig/trunk: CHANGES.txt contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java src/org/apache/pig/builtin/RANDOM.java test/org/apache/pig/test/TestBuiltin.java
Date Wed, 02 Mar 2016 19:37:03 GMT
Author: knoguchi
Date: Wed Mar  2 19:37:03 2016
New Revision: 1733355

URL: http://svn.apache.org/viewvc?rev=1733355&view=rev
Log:
PIG-4819: RANDOM() udf can lead to missing or redundant records (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
    pig/trunk/src/org/apache/pig/builtin/RANDOM.java
    pig/trunk/test/org/apache/pig/test/TestBuiltin.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1733355&r1=1733354&r2=1733355&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar  2 19:37:03 2016
@@ -97,6 +97,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4819: RANDOM() udf can lead to missing or redundant records (knoguchi)
+
 PIG-4816: Read a null scalar causing a Tez failure (daijy)
 
 PIG-4818: Single quote inside comment in GENERATE is not being ignored (knoguchi)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java?rev=1733355&r1=1733354&r2=1733355&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
(original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
Wed Mar  2 19:37:03 2016
@@ -19,17 +19,28 @@
 package org.apache.pig.piggybank.evaluation.math;
 
 import java.io.IOException;
+import java.util.Random;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.data.DataType;
 
 public class RANDOM extends EvalFunc<Double>{
+    private Random r = null;
 
-	public Double exec(Tuple input) throws IOException {
-		return Math.random();
-	}
+    @Override
+    public Double exec(Tuple input) throws IOException {
+        if( r == null ) {
+            int jobidhash = PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode();
+            int taskIndex = Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX));
+            r = new Random(((long) jobidhash) << 32 | (taskIndex & 0xffffffffL));
+        }
+        return r.nextDouble();
+    }
 
     @Override
     public Schema outputSchema(Schema input) {

Modified: pig/trunk/src/org/apache/pig/builtin/RANDOM.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/RANDOM.java?rev=1733355&r1=1733354&r2=1733355&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/RANDOM.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/RANDOM.java Wed Mar  2 19:37:03 2016
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.util.Random;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.data.DataType;
@@ -32,20 +35,24 @@ import org.apache.pig.data.DataType;
  */
 @Nondeterministic
 public class RANDOM extends EvalFunc<Double>{
-    private Random r;
+    private Random r = null;
 
     public RANDOM() {
-        r = new Random();
     }
 
     public RANDOM(String seed) {
         r = new Random(Long.parseLong(seed));
     }
 
-	@Override
-	public Double exec(Tuple input) throws IOException {
-		return r.nextDouble();
-	}
+    @Override
+    public Double exec(Tuple input) throws IOException {
+        if( r == null ) {
+            int jobidhash = PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode();
+            int taskIndex = Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX));
+            r = new Random(((long) jobidhash) << 32 | (taskIndex & 0xffffffffL));
+        }
+        return r.nextDouble();
+    }
 
     @Override
     public Schema outputSchema(Schema input) {

Modified: pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1733355&r1=1733354&r2=1733355&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltin.java Wed Mar  2 19:37:03 2016
@@ -18,6 +18,7 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -40,13 +41,20 @@ import java.util.Random;
 import java.util.Set;
 import java.util.StringTokenizer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
 import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConstants;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.builtin.ARITY;
 import org.apache.pig.builtin.AddDuration;
 import org.apache.pig.builtin.BagSize;
@@ -136,6 +144,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestBuiltin {
+    private static final Log LOG = LogFactory.getLog(TestBuiltin.class);
     private static PigServer pigServer;
     private static Properties properties;
     private static MiniGenericCluster cluster;
@@ -3206,17 +3215,96 @@ public class TestBuiltin {
         pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
         pigServer.registerQuery("B = foreach A generate name, UniqueID();");
         Iterator<Tuple> iter = pigServer.openIterator("B");
-        iter.next().get(1).equals("0-0");
-        iter.next().get(1).equals("0-1");
-        iter.next().get(1).equals("0-2");
-        iter.next().get(1).equals("0-3");
-        iter.next().get(1).equals("0-4");
-        iter.next().get(1).equals("1-0");
-        iter.next().get(1).equals("1-1");
-        iter.next().get(1).equals("1-1");
-        iter.next().get(1).equals("1-2");
-        iter.next().get(1).equals("1-3");
-        iter.next().get(1).equals("1-4");
+        assertEquals(iter.next().get(1),"0-0");
+        assertEquals(iter.next().get(1),"0-1");
+        assertEquals(iter.next().get(1),"0-2");
+        assertEquals(iter.next().get(1),"0-3");
+        assertEquals(iter.next().get(1),"0-4");
+        assertEquals(iter.next().get(1),"1-0");
+        assertEquals(iter.next().get(1),"1-1");
+        assertEquals(iter.next().get(1),"1-2");
+        assertEquals(iter.next().get(1),"1-3");
+        assertEquals(iter.next().get(1),"1-4");
+    }
+
+    @Test
+    public void testRANDOMWithJob() throws Exception {
+        Util.resetStateForExecModeSwitch();
+        String inputFileName = "testRANDOM.txt";
+        Util.createInputFile(cluster, inputFileName, new String[]
+            {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        // running with two mappers
+        pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10");
+        pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
+        pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+        pigServer.registerQuery("B = foreach A generate name, RANDOM();");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        double [] mapper1 = new double[5];
+        double [] mapper2 = new double[5];
+        for( int i = 0; i < 5; i++ ){
+            mapper1[i] = (Double) iter.next().get(1);
+            if( i != 0 ) {
+                // making sure it's not creating same value
+                assertNotEquals(mapper1[i-1], mapper1[i], 0.0001);
+            }
+        }
+        for( int i = 0; i < 5; i++ ){
+            mapper2[i] = (Double) iter.next().get(1);
+            if( i != 0 ) {
+                // making sure it's not creating same value
+                assertNotEquals(mapper2[i-1], mapper2[i], 0.0001);
+            }
+        }
+        // making sure different mappers are creating different random values
+        for( int i = 0; i < 5; i++ ){
+            assertNotEquals(mapper1[i], mapper2[i], 0.0001);
+        }
+    }
+
+
+    @Test
+    public void testRANDOM() throws Exception {
+        PigMapReduce.sJobConfInternal.set(new Configuration());
+        PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "999");
+
+        org.apache.pig.builtin.RANDOM r = new org.apache.pig.builtin.RANDOM();
+        double [] tmpresult = new double [5];
+
+        for( int i = 0; i < 5 ; i++ ) {
+            tmpresult[i] = r.exec(null).doubleValue();
+            LOG.info("Return value of RANDOM(): " + tmpresult[i]);
+            if( i != 0 ) {
+                //making sure RANDOM isn't returning some fixed number
+                assertNotEquals(tmpresult[i-1], tmpresult[i], 0.0001);
+            }
+        }
+
+        // with different task id, random should return different number
+        PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "888");
+        r = new org.apache.pig.builtin.RANDOM();
+        for( int i = 0; i < 5 ; i++ ) {
+            assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
+        }
+
+        // with different jobid, random should return completely different number
+        PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_222");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "999");
+        r = new org.apache.pig.builtin.RANDOM();
+        for( int i = 0; i < 5 ; i++ ) {
+            assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
+        }
+
+        // with same jobid and taskid, random should return exact same sequence
+        // of pseudo-random number
+        PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "999");
+        r = new org.apache.pig.builtin.RANDOM();
+        for( int i = 0; i < 5 ; i++ ) {
+            assertEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 );
+        }
     }
 
     @Test



Mime
View raw message