From hcatalog-commits-return-110-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Wed May 4 17:51:06 2011 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 686F138A6 for ; Wed, 4 May 2011 17:51:06 +0000 (UTC) Received: (qmail 28854 invoked by uid 500); 4 May 2011 17:51:06 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 28829 invoked by uid 500); 4 May 2011 17:51:06 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 28822 invoked by uid 99); 4 May 2011 17:51:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 May 2011 17:51:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 May 2011 17:51:04 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2DFDD2388A38; Wed, 4 May 2011 17:50:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1099539 [3/3] - in /incubator/hcatalog/trunk/src: java/org/apache/hcatalog/mapreduce/ java/org/apache/hcatalog/pig/ test/org/apache/hcatalog/mapreduce/ test/org/apache/hcatalog/pig/ Date: Wed, 04 May 2011 17:50:43 -0000 To: hcatalog-commits@incubator.apache.org From: macyang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110504175044.2DFDD2388A38@eris.apache.org> Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java?rev=1099539&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java (added) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java Wed May 4 17:50:42 2011 @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hcatalog.MiniCluster; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.UDFContext; + +public class TestHCatEximStorer extends TestCase { + + private static final String NONPART_TABLE = "junit_unparted"; + private static final String PARTITIONED_TABLE = "junit_parted"; + private static MiniCluster cluster = MiniCluster.buildCluster(); + + private static final String dataLocation = "/tmp/data"; + private static String fqdataLocation; + private static final String exportLocation = "/tmp/export"; + private static String fqexportLocation; + + private static Properties props; + + private void cleanup() throws IOException { + MiniCluster.deleteFile(cluster, dataLocation); + MiniCluster.deleteFile(cluster, exportLocation); + } + + @Override + protected void setUp() throws Exception { + props = new Properties(); + props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation; + fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation; + System.out.println("FQ Data Location :" + fqdataLocation); + System.out.println("FQ Export Location :" + fqexportLocation); + cleanup(); + } + + @Override + protected void tearDown() throws Exception { + cleanup(); + } + + private void populateDataFile() throws IOException { + MiniCluster.deleteFile(cluster, dataLocation); + String[] input = new String[] { + "237,Krishna,01/01/1990,M,IN,TN", + "238,Kalpana,01/01/2000,F,IN,KA", + "239,Satya,01/01/2001,M,US,TN", + "240,Kavya,01/01/2002,F,US,KA" + }; + MiniCluster.createInputFile(cluster, dataLocation, input); + } + + public void testStoreNonPartTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + NONPART_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');"); + server.executeBatch(); + + FileSystem fs = cluster.getFileSystem(); + + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + + Map.Entry> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); + Table table = metadata.getKey(); + List partitions = metadata.getValue(); + + List columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", + Constants.STRING_TYPE_NAME, ""))); + + + assertEquals("default", table.getDbName()); + assertEquals(NONPART_TABLE, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(0, table.getPartitionKeys().size()); + + assertEquals(0, partitions.size()); + } + + public void testStorePartTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');"); + server.executeBatch(); + + FileSystem fs = cluster.getFileSystem(); + + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + + Map.Entry> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); + Table table = metadata.getKey(); + List partitions = metadata.getValue(); + + List columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", + Constants.STRING_TYPE_NAME, ""))); + + + assertEquals("default", table.getDbName()); + assertEquals(PARTITIONED_TABLE, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(2, table.getPartitionKeys().size()); + List partSchema = table.getPartitionKeys(); + assertEquals("emp_country", partSchema.get(0).getName()); + assertEquals("emp_state", partSchema.get(1).getName()); + + assertEquals(1, partitions.size()); + Partition partition = partitions.get(0); + assertEquals("in", partition.getValues().get(0)); + assertEquals("tn", partition.getValues().get(1)); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + } + + public void testStoreNonPartCompatSchemaTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + NONPART_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '', 'id:int, name:chararray, dob:chararray, sex:chararray');"); + server.executeBatch(); + + FileSystem fs = cluster.getFileSystem(); + + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + + Map.Entry> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); + Table table = metadata.getKey(); + List partitions = metadata.getValue(); + + List columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("sex", + Constants.STRING_TYPE_NAME, ""))); + + + assertEquals("default", table.getDbName()); + assertEquals(NONPART_TABLE, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(0, table.getPartitionKeys().size()); + + assertEquals(0, partitions.size()); + } + + public void testStoreNonPartNonCompatSchemaTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + NONPART_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '', 'id:int, name:chararray, dob:chararray, sex:int');"); + try { + server.executeBatch(); + fail("Expected exception not thrown"); + } catch (FrontendException e) { + } + } + + public void testStoreMultiPartTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"); + server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';"); + server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';"); + server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';"); + server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';"); + server.registerQuery("store INTN into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');"); + server.registerQuery("store INKA into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=ka');"); + server.registerQuery("store USTN into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=tn');"); + server.registerQuery("store USKA into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=ka');"); + server.executeBatch(); + + FileSystem fs = cluster.getFileSystem(); + + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + + Map.Entry> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); + Table table = metadata.getKey(); + List partitions = metadata.getValue(); + + List columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", + Constants.STRING_TYPE_NAME, ""))); + + + assertEquals("default", table.getDbName()); + assertEquals(PARTITIONED_TABLE, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(2, table.getPartitionKeys().size()); + List partSchema = table.getPartitionKeys(); + assertEquals("emp_country", partSchema.get(0).getName()); + assertEquals("emp_state", partSchema.get(1).getName()); + + assertEquals(4, partitions.size()); + Set parts = new TreeSet(); + parts.add("in,tn"); + parts.add("in,ka"); + parts.add("us,tn"); + parts.add("us,ka"); + + for (Partition partition : partitions) { + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertTrue(parts.remove(partition.getValues().get(0) + "," + partition.getValues().get(1))); + } + assertEquals(0, parts.size()); + } +}