Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 20138 invoked from network); 3 Feb 2010 21:08:40 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 3 Feb 2010 21:08:40 -0000 Received: (qmail 24561 invoked by uid 500); 3 Feb 2010 21:08:40 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 24518 invoked by uid 500); 3 Feb 2010 21:08:40 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 24509 invoked by uid 500); 3 Feb 2010 21:08:40 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 24506 invoked by uid 99); 3 Feb 2010 21:08:40 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Feb 2010 21:08:40 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Feb 2010 21:08:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0A4A92388901; Wed, 3 Feb 2010 21:08:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r906244 - in /hadoop/pig/branches/branch-0.6/contrib/zebra: CHANGES.txt src/java/org/apache/hadoop/zebra/pig/TableStorer.java src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java Date: Wed, 03 Feb 2010 21:08:18 -0000 To: pig-commits@incubator.apache.org From: yanz@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100203210819.0A4A92388901@eris.apache.org> Author: yanz Date: Wed Feb 3 21:08:18 2010 New Revision: 906244 URL: http://svn.apache.org/viewvc?rev=906244&view=rev Log: PIG-1206 Storing descendingly sorted PIG table as unsorted table (yanz) Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=906244&r1=906243&r2=906244&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Wed Feb 3 21:08:18 2010 @@ -6,6 +6,8 @@ IMPROVEMENTS + PIG-1206 Storing descendingly sorted PIG table as unsorted table (yanz) + PIG-1125 Map/Reduce API Changes (Chao Wang via yanz) PIG-1104 Streaming Support (Chao Wang via yanz) Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=906244&r1=906243&r2=906244&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Wed Feb 3 21:08:18 2010 @@ -23,6 +23,8 @@ import java.lang.reflect.Constructor; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; @@ -43,6 +45,7 @@ import org.apache.pig.StoreConfig; import org.apache.pig.CommittableStoreFunc; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.SortColInfo.Order; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; @@ -128,19 +131,28 @@ { org.apache.pig.SortColInfo sortColumn; String sortColumnName; + boolean descending = false; for (int i = 0; i < sortColumns.size(); i++) { sortColumn = sortColumns.get(i); sortColumnName = sortColumn.getColName(); if (sortColumnName == null) throw new IOException("Zebra does not support column positional reference yet"); + if (sortColumn.getSortOrder() == Order.DESCENDING) + { + Log LOG = LogFactory.getLog(TableLoader.class); + LOG.warn("Sorting in descending order is not supported by Zebra and the table will be unsorted."); + descending = true; + break; + } if (!org.apache.pig.data.DataType.isAtomic(schema.getField(sortColumnName).type)) throw new IOException(schema.getField(sortColumnName).alias+" is not of simple type as required for a sort column now."); if (i > 0) sb.append(","); sb.append(sortColumnName); } - sortColumnNames = sb.toString(); + if (!descending) + sortColumnNames = sb.toString(); } } try { Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java?rev=906244&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java (added) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java Wed Feb 3 21:08:18 2010 @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.zebra.pig; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Iterator; +import java.util.StringTokenizer; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.zebra.io.BasicTable; +import org.apache.hadoop.zebra.io.TableInserter; +import org.apache.hadoop.zebra.pig.TableStorer; +import org.apache.hadoop.zebra.schema.Schema; +import org.apache.hadoop.zebra.types.TypesUtils; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.MiniCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Note: + * + * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the + * app/debug configuration, when run this from inside the Eclipse. + * + */ +public class TestTableSortStorerDesc { + protected static ExecType execType = ExecType.MAPREDUCE; + private static MiniCluster cluster; + protected static PigServer pigServer; + private static Path pathTable; + + @BeforeClass + public static void setUp() throws Exception { + if (System.getProperty("hadoop.log.dir") == null) { + String base = new File(".").getPath(); // getAbsolutePath(); + System + .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs"); + } + + if (execType == ExecType.MAPREDUCE) { + cluster = MiniCluster.buildCluster(); + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + } else { + pigServer = new PigServer(ExecType.LOCAL); + } + + Configuration conf = new Configuration(); + FileSystem fs = cluster.getFileSystem(); + Path pathWorking = fs.getWorkingDirectory(); + pathTable = new Path(pathWorking, "TestTableSortStorerDesc"); + System.out.println("pathTable =" + pathTable); + BasicTable.Writer writer = new BasicTable.Writer(pathTable, + "SF_a:string,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g", + "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf); + Schema schema = writer.getSchema(); + Tuple tuple = TypesUtils.createTuple(schema); + + final int numsBatch = 10; + final int numsInserters = 1; + TableInserter[] inserters = new TableInserter[numsInserters]; + for (int i = 0; i < numsInserters; i++) { + inserters[i] = writer.getInserter("ins" + i, false); + } + + for (int b = 0; b < numsBatch; b++) { + for (int i = 0; i < numsInserters; i++) { + TypesUtils.resetTuple(tuple); + for (int k = 0; k < tuple.size(); ++k) { + try { + tuple.set(k, (9-b) + "_" + i + "" + k); + } catch (ExecException e) { + e.printStackTrace(); + } + } + inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple); + } + } + for (int i = 0; i < numsInserters; i++) { + inserters[i].close(); + } + writer.close(); + } + + @AfterClass + public static void tearDown() throws Exception { + pigServer.shutdown(); + } + + /** + * Return the name of the routine that called getCurrentMethodName + * + */ + public String getCurrentMethodName() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter pw = new PrintWriter(baos); + (new Throwable()).printStackTrace(pw); + pw.flush(); + String stackTrace = baos.toString(); + pw.close(); + + StringTokenizer tok = new StringTokenizer(stackTrace, "\n"); + tok.nextToken(); // 'java.lang.Throwable' + tok.nextToken(); // 'at ...getCurrentMethodName' + String l = tok.nextToken(); // 'at ...' + // Parse line 3 + tok = new StringTokenizer(l.trim(), " <("); + String t = tok.nextToken(); // 'at' + t = tok.nextToken(); // '...' + return t; + } + + @Test + public void testStorer() throws ExecException, IOException { + /* + * Use pig LOAD to load testing data for store + */ + String query = "records = LOAD '" + pathTable.toString() + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + pigServer.registerQuery(query); + + Iterator it2 = pigServer.openIterator("records"); + int row0 = 0; + Tuple RowValue2 = null; + while (it2.hasNext()) { + // Last row value + RowValue2 = it2.next(); + row0++; + if (row0 == 10) { + Assert.assertEquals("0_01", RowValue2.get(1)); + Assert.assertEquals("0_00", RowValue2.get(0).toString()); + } + } + Assert.assertEquals(10, row0); + + String orderby = "srecs = ORDER records BY SF_a DESC;"; + pigServer.registerQuery(orderby); + + /* + * Use pig STORE to store testing data BasicTable.Writer writer = new + * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g", + * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf); + */ + Path newPath = new Path(getCurrentMethodName()); + + pigServer + .store( + "srecs", + newPath.toString(), + TableStorer.class.getCanonicalName() + + "('[SF_a, SF_b, SF_c]; [SF_e]')"); + + // check new table content + String query3 = "newRecords = LOAD '" + + newPath.toString() + + "' USING org.apache.hadoop.zebra.pig.TableLoader('SF_a, SF_b');"; + pigServer.registerQuery(query3); + + Iterator it3 = pigServer.openIterator("newRecords"); + int row = 0; + Tuple RowValue3 = null; + while (it3.hasNext()) { + // Last row value + RowValue3 = it3.next(); + row++; + } + Assert.assertEquals(10, row); + } +}