From commits-return-8322-archive-asf-public=cust-asf.ponee.io@pig.apache.org Fri Jan 25 22:51:44 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 54C00180608 for ; Fri, 25 Jan 2019 22:51:43 +0100 (CET) Received: (qmail 4228 invoked by uid 500); 25 Jan 2019 21:51:42 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 4219 invoked by uid 99); 25 Jan 2019 21:51:42 -0000 Received: from Unknown (HELO svn01-us-west.apache.org) (209.188.14.144) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Jan 2019 21:51:42 +0000 Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id B577E3A0116 for ; Fri, 25 Jan 2019 21:51:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1852183 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/pig/backend/hadoop/executionengine/util/ test/org/apache/pig/test/ Date: Fri, 25 Jan 2019 21:51:41 -0000 To: commits@pig.apache.org From: knoguchi@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20190125215141.B577E3A0116@svn01-us-west.apache.org> Author: knoguchi Date: Fri Jan 25 21:51:40 2019 New Revision: 1852183 URL: http://svn.apache.org/viewvc?rev=1852183&view=rev Log: PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi) Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1852183&r1=1852182&r2=1852183&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Fri Jan 25 21:51:40 2019 @@ -87,6 +87,7 @@ PIG-5251: Bump joda-time to 2.9.9 (dbist OPTIMIZATIONS BUG FIXES +PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi) PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1852183&r1=1852182&r2=1852183&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Fri Jan 25 21:51:40 2019 @@ -112,8 +112,6 @@ public class SkewedPartitioner extends P @Override public void setConf(Configuration job) { conf = job; - PigMapReduce.sJobConfInternal.set(conf); - PigMapReduce.sJobConf = conf; } @Override Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1852183&r1=1852182&r2=1852183&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Jan 25 21:51:40 2019 @@ -93,10 +93,10 @@ public class MapRedUtil { conf.set("yarn.resourcemanager.principal", mapConf.get("yarn.resourcemanager.principal")); } - if (PigMapReduce.sJobConfInternal.get().get("fs.file.impl")!=null) - conf.set("fs.file.impl", PigMapReduce.sJobConfInternal.get().get("fs.file.impl")); - if (PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl")!=null) - conf.set("fs.hdfs.impl", PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl")); + if (mapConf.get("fs.file.impl")!=null) + conf.set("fs.file.impl", mapConf.get("fs.file.impl")); + if (mapConf.get("fs.hdfs.impl")!=null) + conf.set("fs.hdfs.impl", mapConf.get("fs.hdfs.impl")); copyTmpFileConfigurationValues(PigMapReduce.sJobConfInternal.get(), conf); Modified: pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=1852183&r1=1852182&r2=1852183&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original) +++ pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Fri Jan 25 21:51:40 2019 @@ -207,7 +207,6 @@ public class TestSkewedJoin { assertEquals(0, count); } - @Test public void testSkewedJoinWithGroup() throws IOException{ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);"); @@ -354,7 +353,7 @@ public class TestSkewedJoin { try { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); { - pigServer.registerQuery("C = join A by id, B by id using 'skewed';"); + pigServer.registerQuery("C = join A by id, B by id using 'skewed' parallel 2;"); Iterator iter = pigServer.openIterator("C"); while(iter.hasNext()) { @@ -375,7 +374,7 @@ public class TestSkewedJoin { pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);"); DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); { - pigServer.registerQuery("C = join A by id left, B by id using 'skewed';"); + pigServer.registerQuery("C = join A by id left, B by id using 'skewed' parallel 2;"); Iterator iter = pigServer.openIterator("C"); while(iter.hasNext()) { @@ -383,7 +382,7 @@ public class TestSkewedJoin { } } { - pigServer.registerQuery("C = join A by id right, B by id using 'skewed';"); + pigServer.registerQuery("C = join A by id right, B by id using 'skewed' parallel 2;"); Iterator iter = pigServer.openIterator("C"); while(iter.hasNext()) { @@ -391,7 +390,7 @@ public class TestSkewedJoin { } } { - pigServer.registerQuery("C = join A by id full, B by id using 'skewed';"); + pigServer.registerQuery("C = join A by id full, B by id using 'skewed' parallel 2;"); Iterator iter = pigServer.openIterator("C"); while(iter.hasNext()) { @@ -413,7 +412,7 @@ public class TestSkewedJoin { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag(); { - pigServer.registerQuery("E = join C by id, D by id using 'skewed';"); + pigServer.registerQuery("E = join C by id, D by id using 'skewed' parallel 2;"); Iterator iter = pigServer.openIterator("E"); while(iter.hasNext()) { @@ -487,7 +486,7 @@ public class TestSkewedJoin { pigServer.registerQuery("a = load 'left.dat' as (nums:chararray);"); pigServer.registerQuery("b = load 'right.dat' as (number:chararray,text:chararray);"); pigServer.registerQuery("c = filter a by nums == '7';"); - pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number USING 'skewed';"); + pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number USING 'skewed' parallel 2;"); Iterator iter = pigServer.openIterator("d"); @@ -515,7 +514,7 @@ public class TestSkewedJoin { pigServer.registerQuery("a = load 'foo' as (nums:chararray);"); pigServer.registerQuery("b = load 'foo' as (nums:chararray);"); - pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed';"); + pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed' parallel 2;"); Iterator iter = pigServer.openIterator("d"); int count = 0; @@ -569,7 +568,7 @@ public class TestSkewedJoin { "exists = LOAD '" + INPUT_FILE2 + "' AS (a:long, x:chararray);" + "missing = LOAD '/non/existing/directory' AS (a:long);" + "missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" + - "joined = JOIN exists BY a, missing BY a USING 'skewed';"; + "joined = JOIN exists BY a, missing BY a USING 'skewed' parallel 2;"; String logFile = Util.createTempFileDelOnExit("tmp", ".log").getAbsolutePath(); Logger logger = Logger.getLogger("org.apache.pig"); @@ -619,4 +618,34 @@ public class TestSkewedJoin { } } + // PIG-5372 + @Test + public void testSkewedJoinWithRANDOMudf() throws IOException{ + pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);"); + pigServer.registerQuery("A2 = FOREACH A GENERATE id, RANDOM() as randnum;"); + + DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("D = join A2 by id, B by id using 'skewed' parallel 2;"); + Iterator iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbfrj.add(iter.next()); + } + } + { + pigServer.registerQuery("D = join A2 by id, B by id;"); + Iterator iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + } + assertTrue(dbfrj.size()>0); + assertTrue(dbshj.size()>0); + assertEquals(dbfrj.size(), dbshj.size()); + } + + }