Return-Path: Delivered-To: apmail-pig-commits-archive@www.apache.org Received: (qmail 62676 invoked from network); 23 Dec 2010 18:54:16 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 23 Dec 2010 18:54:16 -0000 Received: (qmail 12796 invoked by uid 500); 23 Dec 2010 18:54:16 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 12768 invoked by uid 500); 23 Dec 2010 18:54:16 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 12761 invoked by uid 99); 23 Dec 2010 18:54:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Dec 2010 18:54:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Dec 2010 18:54:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 04D0A2388A4B; Thu, 23 Dec 2010 18:53:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1052354 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/impl/i... Date: Thu, 23 Dec 2010 18:53:53 -0000 To: commits@pig.apache.org From: daijy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101223185354.04D0A2388A4B@eris.apache.org> Author: daijy Date: Thu Dec 23 18:53:53 2010 New Revision: 1052354 URL: http://svn.apache.org/viewvc?rev=1052354&view=rev Log: PIG-1277: Pig should give error message when cogroup on tuple keys of different inner type Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java pig/trunk/src/org/apache/pig/impl/io/NullableBytesWritable.java pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java pig/trunk/test/org/apache/pig/test/TestPackage.java pig/trunk/test/org/apache/pig/test/TestSecondarySort.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1052354&r1=1052353&r2=1052354&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Thu Dec 23 18:53:53 2010 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-1277: Pig should give error message when cogroup on tuple keys of different inner type (daijy) + PIG-1755: Clean up duplicated code in PhysicalOperators (dvryaboy) PIG-750: Use combiner when algebraic UDFs are used in expressions (thejas) @@ -247,6 +249,8 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1771: New logical plan: Merge schema fail if LoadFunc.getSchema return different schema with "Load...AS" (daijy) + PIG-1766: New logical plan: ImplicitSplitInserter should before DuplicateForEachColumnRewrite (daijy) PIG-1762: Logical simplification fails on map key referenced values (yanz) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java?rev=1052354&r1=1052353&r2=1052354&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java Thu Dec 23 18:53:53 2010 @@ -53,8 +53,12 @@ public class HDataType { static Map typeToName = null; public static PigNullableWritable getWritableComparableTypes(Object o, byte keyType) throws ExecException{ - byte type = DataType.findType(o); - switch (type) { + + byte newKeyType = keyType; + if (o==null) + newKeyType = DataType.NULL; + + switch (newKeyType) { case DataType.BAG: return new NullableBag((DataBag)o); @@ -62,7 +66,7 @@ public class HDataType { return new NullableBooleanWritable((Boolean)o); case DataType.BYTEARRAY: - return new NullableBytesWritable(((DataByteArray)o).get()); + return new NullableBytesWritable(o); case DataType.CHARARRAY: return new NullableText((String)o); @@ -138,7 +142,7 @@ public class HDataType { if (typeToName == null) typeToName = DataType.genTypeToNameMap(); int errCode = 2044; String msg = "The type " - + typeToName.get(type) + + typeToName.get(keyType) + " cannot be collected as a Key type"; throw new ExecException(msg, errCode, PigException.BUG); @@ -192,4 +196,30 @@ public class HDataType { } return wcKey; } + + public static byte findTypeFromNullableWritable(PigNullableWritable o) throws ExecException { + if (o instanceof NullableBooleanWritable) + return DataType.BOOLEAN; + else if (o instanceof NullableBytesWritable) + return DataType.BYTEARRAY; + else if (o instanceof NullableText) + return DataType.CHARARRAY; + else if (o instanceof NullableFloatWritable) + return DataType.FLOAT; + else if (o instanceof NullableDoubleWritable) + return DataType.DOUBLE; + else if (o instanceof NullableIntWritable) + return DataType.INTEGER; + else if (o instanceof NullableLongWritable) + return DataType.LONG; + else if (o instanceof NullableBag) + return DataType.BAG; + else if (o instanceof NullableTuple) + return DataType.TUPLE; + else { + int errCode = 2044; + String msg = "Cannot find Pig type for " + o.getClass().getName(); + throw new ExecException(msg, errCode, PigException.BUG); + } + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1052354&r1=1052353&r2=1052354&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Thu Dec 23 18:53:53 2010 @@ -29,6 +29,7 @@ import org.apache.hadoop.io.WritableComp import org.apache.hadoop.mapred.JobConf; import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; import org.apache.pig.impl.io.NullableBytesWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -97,7 +98,7 @@ public class PigBytesRawComparator exten // If either are null, handle differently. if (!nbw1.isNull() && !nbw2.isNull()) { - rc = ((DataByteArray)nbw1.getValueAsPigType()).compareTo((DataByteArray)nbw2.getValueAsPigType()); + rc = DataType.compare(nbw1.getValueAsPigType(), nbw2.getValueAsPigType()); } else { // For sorting purposes two nulls are equal. if (nbw1.isNull() && nbw2.isNull()) rc = 0; Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1052354&r1=1052353&r2=1052354&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Dec 23 18:53:53 2010 @@ -187,7 +187,7 @@ public class PigMapReduce { } PigNullableWritable key = - HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), DataType.TUPLE); + HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), keyType); NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=1052354&r1=1052353&r2=1052354&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Thu Dec 23 18:53:53 2010 @@ -265,7 +265,7 @@ public class POMultiQueryPackage extends myObj.setNull(true); } else { - myObj = HDataType.getWritableComparableTypes(obj, (byte)0); + myObj = HDataType.getWritableComparableTypes(obj, HDataType.findTypeFromNullableWritable(curKey)); } myObj.setIndex(origIndex); tuple.set(0, myObj); Modified: pig/trunk/src/org/apache/pig/impl/io/NullableBytesWritable.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullableBytesWritable.java?rev=1052354&r1=1052353&r2=1052354&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/io/NullableBytesWritable.java (original) +++ pig/trunk/src/org/apache/pig/impl/io/NullableBytesWritable.java Thu Dec 23 18:53:53 2010 @@ -17,9 +17,9 @@ */ package org.apache.pig.impl.io; -import org.apache.hadoop.io.BytesWritable; - -import org.apache.pig.data.DataByteArray; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; /** * @@ -27,18 +27,26 @@ import org.apache.pig.data.DataByteArray public class NullableBytesWritable extends PigNullableWritable { public NullableBytesWritable() { - mValue = new BytesWritable(); + mValue = TupleFactory.getInstance().newTuple(); } /** - * @param bytes + * @param obj */ - public NullableBytesWritable(byte[] bytes) { - mValue = new BytesWritable(bytes); + public NullableBytesWritable(Object obj) { + mValue = TupleFactory.getInstance().newTuple(); + ((Tuple)mValue).append(obj); } public Object getValueAsPigType() { - BytesWritable bw = (BytesWritable)mValue; - return isNull() ? null : new DataByteArray(bw.getBytes(), 0, bw.getLength()); + if (isNull()) + return null; + Object obj = null; + try { + obj = ((Tuple)mValue).get(0); + } catch (ExecException e) { + throw new RuntimeException(e); + } + return obj; } } Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1052354&r1=1052353&r2=1052354&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Thu Dec 23 18:53:53 2010 @@ -58,6 +58,8 @@ public class LOUnion extends LogicalRela return s0; LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema(); LogicalSchema mergedSchema = LogicalSchema.merge(s0, s1); + if (mergedSchema==null) + return null; // Merge schema for (int i=2;i iter = pigServer.openIterator("b"); + + Tuple t = iter.next(); + assertTrue(t.size()==4); + assertTrue(t.toString().equals("(hello,hello,(hello),[key#value])")); + } + + static public class MapGenerate extends EvalFunc { + @Override + public Map exec(Tuple input) throws IOException { + Map m = new HashMap(); + m.put("key", new Integer(input.size())); + return m; + } + + @Override + public Schema outputSchema(Schema input) { + return new Schema(new Schema.FieldSchema(null, DataType.MAP)); + } + } + + // See PIG-1277 + @Test + public void testWrappingUnknownKey1() throws Exception{ + String[] input1 = { + "1", + }; + + Util.createInputFile(cluster, "table_testWrappingUnknownKey1", input1); + + pigServer.registerQuery("a = load 'table_testWrappingUnknownKey1' as (a0);"); + pigServer.registerQuery("b = foreach a generate a0, "+ MapGenerate.class.getName() + "(*) as m:map[];"); + pigServer.registerQuery("c = foreach b generate a0, m#'key' as key;"); + pigServer.registerQuery("d = group c by key;"); + + Iterator iter = pigServer.openIterator("d"); + + Tuple t = iter.next(); + assertTrue(t.size()==2); + assertTrue(t.toString().equals("(1,{(1,1)})")); + assertFalse(iter.hasNext()); + } + + // See PIG-999 + @Test + public void testWrappingUnknownKey2() throws Exception{ + String[] input1 = { + "1", + }; + + Util.createInputFile(cluster, "table_testWrappingUnknownKey2", input1); + + pigServer.registerQuery("a = load 'table_testWrappingUnknownKey2' as (a0);"); + pigServer.registerQuery("b = foreach a generate a0, "+ MapGenerate.class.getName() + "(*) as m:map[];"); + pigServer.registerQuery("c = foreach b generate a0, m#'key' as key;"); + pigServer.registerQuery("d = order c by key;"); + + Iterator iter = pigServer.openIterator("d"); + + Tuple t = iter.next(); + assertTrue(t.size()==2); + assertTrue(t.toString().equals("(1,1)")); + assertFalse(iter.hasNext()); + } + + // See PIG-1065 + @Test + public void testWrappingUnknownKey3() throws Exception{ + String[] input1 = { + "1\t2", + "2\t3" + }; + + String[] input2 = { + "1", + }; + + Util.createInputFile(cluster, "table_testWrappingUnknownKey3_1", input1); + Util.createInputFile(cluster, "table_testWrappingUnknownKey3_2", input2); + + pigServer.registerQuery("a = load 'table_testWrappingUnknownKey3_1' as (a0:chararray, a1:chararray);"); + pigServer.registerQuery("b = load 'table_testWrappingUnknownKey3_2' as (b0:chararray);"); + pigServer.registerQuery("c = union a, b;"); + pigServer.registerQuery("d = order c by $0;"); + + Collection results = new HashSet(); + results.add("(1,2)"); + results.add("(1)"); + results.add("(2,3)"); + + Iterator iter = pigServer.openIterator("d"); + + Tuple t = iter.next(); + assertTrue(results.contains(t.toString())); + t = iter.next(); + assertTrue(results.contains(t.toString())); + t = iter.next(); + assertTrue(results.contains(t.toString())); + assertFalse(iter.hasNext()); + } } Modified: pig/trunk/test/org/apache/pig/test/TestPackage.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPackage.java?rev=1052354&r1=1052353&r2=1052354&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPackage.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPackage.java Thu Dec 23 18:53:53 2010 @@ -56,7 +56,7 @@ public class TestPackage extends junit.f public void tearDown() throws Exception { } - private void runTest(Object key, boolean inner[]) throws ExecException, IOException { + private void runTest(Object key, boolean inner[], byte keyType) throws ExecException, IOException { Random r = new Random(); DataBag db1 = GenRandomData.genRandSmallTupDataBag(r, 10, 100); DataBag db2 = GenRandomData.genRandSmallTupDataBag(r, 10, 100); @@ -79,7 +79,7 @@ public class TestPackage extends junit.f POPackage pop = new POPackage(new OperatorKey("", r.nextLong())); pop.setNumInps(2); pop.setInner(inner); - PigNullableWritable k = HDataType.getWritableComparableTypes(key, (byte)0); + PigNullableWritable k = HDataType.getWritableComparableTypes(key, keyType); pop.attachInput(k, db.iterator()); // we are not doing any optimization to remove @@ -117,43 +117,43 @@ public class TestPackage extends junit.f Random r = new Random(); switch (t) { case DataType.BAG: - runTest(GenRandomData.genRandSmallTupDataBag(r, 10, 100),inner); + runTest(GenRandomData.genRandSmallTupDataBag(r, 10, 100),inner, DataType.BAG); break; case DataType.BOOLEAN: - runTest(r.nextBoolean(),inner); + runTest(r.nextBoolean(),inner, DataType.BOOLEAN); break; case DataType.BYTEARRAY: - runTest(GenRandomData.genRandDBA(r),inner); + runTest(GenRandomData.genRandDBA(r),inner, DataType.BYTEARRAY); break; case DataType.BIGCHARARRAY: { String s = GenRandomData.genRandString(r); for(;s.length() < 65535;) { s += GenRandomData.genRandString(r); } - runTest(s,inner); + runTest(s,inner, DataType.CHARARRAY); break; } case DataType.CHARARRAY: - runTest(GenRandomData.genRandString(r),inner); + runTest(GenRandomData.genRandString(r),inner, DataType.CHARARRAY); break; case DataType.DOUBLE: - runTest(r.nextDouble(),inner); + runTest(r.nextDouble(),inner, DataType.DOUBLE); break; case DataType.FLOAT: - runTest(r.nextFloat(),inner); + runTest(r.nextFloat(),inner, DataType.FLOAT); break; case DataType.INTEGER: - runTest(r.nextLong(),inner); + runTest(r.nextInt(),inner, DataType.INTEGER); break; case DataType.LONG: - runTest(r.nextLong(),inner); + runTest(r.nextLong(),inner, DataType.LONG); break; case DataType.MAP: case DataType.INTERNALMAP: case DataType.BYTE: return; // map not key type case DataType.TUPLE: - runTest(GenRandomData.genRandSmallBagTuple(r, 10, 100),inner); + runTest(GenRandomData.genRandSmallBagTuple(r, 10, 100),inner, DataType.TUPLE); break; default: Modified: pig/trunk/test/org/apache/pig/test/TestSecondarySort.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSecondarySort.java?rev=1052354&r1=1052353&r2=1052354&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestSecondarySort.java (original) +++ pig/trunk/test/org/apache/pig/test/TestSecondarySort.java Thu Dec 23 18:53:53 2010 @@ -444,9 +444,9 @@ public class TestSecondarySort extends T .registerQuery("D = foreach C { E = limit A 10; F = E.a1; G = DISTINCT F; generate group, COUNT(G);};"); Iterator iter = pigServer.openIterator("D"); assertTrue(iter.hasNext()); - assertTrue(iter.next().toString().equals("(1,2)")); - assertTrue(iter.hasNext()); assertTrue(iter.next().toString().equals("(2,1)")); + assertTrue(iter.hasNext()); + assertTrue(iter.next().toString().equals("(1,2)")); assertFalse(iter.hasNext()); Util.deleteFile(cluster, tmpFile1.getCanonicalPath()); Util.deleteFile(cluster, tmpFile2.getCanonicalPath()); @@ -469,9 +469,9 @@ public class TestSecondarySort extends T pigServer.registerQuery("C = foreach B { D = distinct A; generate group, D;};"); Iterator iter = pigServer.openIterator("C"); assertTrue(iter.hasNext()); - assertTrue(iter.next().toString().equals("(1,{(1,2,3),(1,2,4),(1,3,4)})")); - assertTrue(iter.hasNext()); assertTrue(iter.next().toString().equals("(2,{(2,3,4)})")); + assertTrue(iter.hasNext()); + assertTrue(iter.next().toString().equals("(1,{(1,2,3),(1,2,4),(1,3,4)})")); assertFalse(iter.hasNext()); Util.deleteFile(cluster, tmpFile1.getCanonicalPath()); } @@ -493,9 +493,9 @@ public class TestSecondarySort extends T pigServer.registerQuery("C = foreach B { D = limit A 10; E = order D by $1; generate group, E;};"); Iterator iter = pigServer.openIterator("C"); assertTrue(iter.hasNext()); - assertTrue(iter.next().toString().equals("(1,{(1,2,3),(1,2,4),(1,2,4),(1,2,4),(1,3,4)})")); - assertTrue(iter.hasNext()); assertTrue(iter.next().toString().equals("(2,{(2,3,4)})")); + assertTrue(iter.hasNext()); + assertTrue(iter.next().toString().equals("(1,{(1,2,3),(1,2,4),(1,2,4),(1,2,4),(1,3,4)})")); assertFalse(iter.hasNext()); Util.deleteFile(cluster, tmpFile1.getCanonicalPath()); } @@ -517,9 +517,9 @@ public class TestSecondarySort extends T pigServer.registerQuery("C = foreach B { D = order A by a1 desc; generate group, D;};"); Iterator iter = pigServer.openIterator("C"); assertTrue(iter.hasNext()); - assertEquals("(1,{(1,8,4),(1,4,4),(1,3,4),(1,2,3),(1,2,4)})", iter.next().toString()); - assertTrue(iter.hasNext()); assertEquals("(2,{(2,3,4)})", iter.next().toString()); + assertTrue(iter.hasNext()); + assertEquals("(1,{(1,8,4),(1,4,4),(1,3,4),(1,2,3),(1,2,4)})", iter.next().toString()); assertFalse(iter.hasNext()); Util.deleteFile(cluster, tmpFile1.getCanonicalPath()); }