incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From macy...@apache.org
Subject svn commit: r1099539 [3/3] - in /incubator/hcatalog/trunk/src: java/org/apache/hcatalog/mapreduce/ java/org/apache/hcatalog/pig/ test/org/apache/hcatalog/mapreduce/ test/org/apache/hcatalog/pig/
Date Wed, 04 May 2011 17:50:43 GMT
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java Wed
May  4 17:50:42 2011
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHCatEximStorer extends TestCase {
+
+  private static final String NONPART_TABLE = "junit_unparted";
+  private static final String PARTITIONED_TABLE = "junit_parted";
+  private static MiniCluster cluster = MiniCluster.buildCluster();
+
+  private static final String dataLocation = "/tmp/data";
+  private static String fqdataLocation;
+  private static final String exportLocation = "/tmp/export";
+  private static String fqexportLocation;
+
+  private static Properties props;
+
+  private void cleanup() throws IOException {
+    MiniCluster.deleteFile(cluster, dataLocation);
+    MiniCluster.deleteFile(cluster, exportLocation);
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    props = new Properties();
+    props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+    System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+ ", fs.default.name : " + props.getProperty("fs.default.name"));
+    fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation;
+    fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation;
+    System.out.println("FQ Data Location :" + fqdataLocation);
+    System.out.println("FQ Export Location :" + fqexportLocation);
+    cleanup();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    cleanup();
+  }
+
+  private void populateDataFile() throws IOException {
+    MiniCluster.deleteFile(cluster, dataLocation);
+    String[] input = new String[] {
+        "237,Krishna,01/01/1990,M,IN,TN",
+        "238,Kalpana,01/01/2000,F,IN,KA",
+        "239,Satya,01/01/2001,M,US,TN",
+        "240,Kavya,01/01/2002,F,US,KA"
+    };
+    MiniCluster.createInputFile(cluster, dataLocation, input);
+  }
+
+  public void testStoreNonPartTable() throws Exception {
+    populateDataFile();
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int,
emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+    server.registerQuery("store A into '" + NONPART_TABLE
+        + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');");
+    server.executeBatch();
+
+    FileSystem fs = cluster.getFileSystem();
+
+    System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+ ", fs.default.name : " + props.getProperty("fs.default.name"));
+
+    Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new
Path(exportLocation, "_metadata"));
+    Table table = metadata.getKey();
+    List<Partition> partitions = metadata.getValue();
+
+    List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>();
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+        Constants.INT_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+        Constants.STRING_TYPE_NAME, "")));
+
+
+    assertEquals("default", table.getDbName());
+    assertEquals(NONPART_TABLE, table.getTableName());
+    assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+        HCatUtil.getFieldSchemaList(columns)));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+        table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+        table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+    assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+        table.getSd().getInputFormat());
+    assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+        table.getSd().getOutputFormat());
+    assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+        table.getSd().getSerdeInfo().getSerializationLib());
+    assertEquals(0, table.getPartitionKeys().size());
+
+    assertEquals(0, partitions.size());
+  }
+
+  public void testStorePartTable() throws Exception {
+    populateDataFile();
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int,
emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+    server.registerQuery("store A into '" + PARTITIONED_TABLE
+        + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');");
+    server.executeBatch();
+
+    FileSystem fs = cluster.getFileSystem();
+
+    System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+ ", fs.default.name : " + props.getProperty("fs.default.name"));
+
+    Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new
Path(exportLocation, "_metadata"));
+    Table table = metadata.getKey();
+    List<Partition> partitions = metadata.getValue();
+
+    List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>();
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+        Constants.INT_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+        Constants.STRING_TYPE_NAME, "")));
+
+
+    assertEquals("default", table.getDbName());
+    assertEquals(PARTITIONED_TABLE, table.getTableName());
+    assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+        HCatUtil.getFieldSchemaList(columns)));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+        table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+        table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+    assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+        table.getSd().getInputFormat());
+    assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+        table.getSd().getOutputFormat());
+    assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+        table.getSd().getSerdeInfo().getSerializationLib());
+    assertEquals(2, table.getPartitionKeys().size());
+    List<FieldSchema> partSchema = table.getPartitionKeys();
+    assertEquals("emp_country", partSchema.get(0).getName());
+    assertEquals("emp_state", partSchema.get(1).getName());
+
+    assertEquals(1, partitions.size());
+    Partition partition = partitions.get(0);
+    assertEquals("in", partition.getValues().get(0));
+    assertEquals("tn", partition.getValues().get(1));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+        partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+        partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+  }
+
+  public void testStoreNonPartCompatSchemaTable() throws Exception {
+    populateDataFile();
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int,
emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+    server.registerQuery("store A into '" + NONPART_TABLE
+        + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '',
'id:int, name:chararray, dob:chararray, sex:chararray');");
+    server.executeBatch();
+
+    FileSystem fs = cluster.getFileSystem();
+
+    System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+ ", fs.default.name : " + props.getProperty("fs.default.name"));
+
+    Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new
Path(exportLocation, "_metadata"));
+    Table table = metadata.getKey();
+    List<Partition> partitions = metadata.getValue();
+
+    List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>();
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("id",
+        Constants.INT_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("name",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("dob",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("sex",
+        Constants.STRING_TYPE_NAME, "")));
+
+
+    assertEquals("default", table.getDbName());
+    assertEquals(NONPART_TABLE, table.getTableName());
+    assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+        HCatUtil.getFieldSchemaList(columns)));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+        table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+        table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+    assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+        table.getSd().getInputFormat());
+    assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+        table.getSd().getOutputFormat());
+    assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+        table.getSd().getSerdeInfo().getSerializationLib());
+    assertEquals(0, table.getPartitionKeys().size());
+
+    assertEquals(0, partitions.size());
+  }
+
+  public void testStoreNonPartNonCompatSchemaTable() throws Exception {
+    populateDataFile();
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int,
emp_name:chararray, emp_dob:chararray, emp_sex:chararray);");
+    server.registerQuery("store A into '" + NONPART_TABLE
+        + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '',
'id:int, name:chararray, dob:chararray, sex:int');");
+    try {
+      server.executeBatch();
+      fail("Expected exception not thrown");
+    } catch (FrontendException e) {
+    }
+  }
+
+  public void testStoreMultiPartTable() throws Exception {
+    populateDataFile();
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int,
emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);");
+    server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';");
+    server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';");
+    server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';");
+    server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';");
+    server.registerQuery("store INTN into '" + PARTITIONED_TABLE
+        + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');");
+    server.registerQuery("store INKA into '" + PARTITIONED_TABLE
+        + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=ka');");
+    server.registerQuery("store USTN into '" + PARTITIONED_TABLE
+        + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=tn');");
+    server.registerQuery("store USKA into '" + PARTITIONED_TABLE
+        + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=ka');");
+    server.executeBatch();
+
+    FileSystem fs = cluster.getFileSystem();
+
+    System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName()
+ ", fs.default.name : " + props.getProperty("fs.default.name"));
+
+    Map.Entry<Table, List<Partition>> metadata = EximUtil.readMetaData(fs, new
Path(exportLocation, "_metadata"));
+    Table table = metadata.getKey();
+    List<Partition> partitions = metadata.getValue();
+
+    List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>();
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id",
+        Constants.INT_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob",
+        Constants.STRING_TYPE_NAME, "")));
+    columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex",
+        Constants.STRING_TYPE_NAME, "")));
+
+
+    assertEquals("default", table.getDbName());
+    assertEquals(PARTITIONED_TABLE, table.getTableName());
+    assertTrue(EximUtil.schemaCompare(table.getSd().getCols(),
+        HCatUtil.getFieldSchemaList(columns)));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+        table.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+    assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+        table.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+    assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+        table.getSd().getInputFormat());
+    assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+        table.getSd().getOutputFormat());
+    assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+        table.getSd().getSerdeInfo().getSerializationLib());
+    assertEquals(2, table.getPartitionKeys().size());
+    List<FieldSchema> partSchema = table.getPartitionKeys();
+    assertEquals("emp_country", partSchema.get(0).getName());
+    assertEquals("emp_state", partSchema.get(1).getName());
+
+    assertEquals(4, partitions.size());
+    Set<String> parts = new TreeSet<String>();
+    parts.add("in,tn");
+    parts.add("in,ka");
+    parts.add("us,tn");
+    parts.add("us,ka");
+
+    for (Partition partition : partitions) {
+      assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver",
+          partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS));
+      assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver",
+          partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS));
+      assertTrue(parts.remove(partition.getValues().get(0) + "," + partition.getValues().get(1)));
+    }
+    assertEquals(0, parts.size());
+  }
+}



Mime
View raw message