incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tra...@apache.org
Subject svn commit: r1383152 [4/27] - in /incubator/hcatalog/trunk: ./ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ s...
Date Mon, 10 Sep 2012 23:29:03 GMT
Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java Mon Sep 10 23:28:55 2012
@@ -49,393 +49,395 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
 public class TestHCatLoader extends TestCase {
-  private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
-      "/build/test/data/" + TestHCatLoader.class.getCanonicalName();
-  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
-  private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
-  private static final String COMPLEX_FILE_NAME = TEST_DATA_DIR + "/complex.input.data";
-
-  private static final String BASIC_TABLE = "junit_unparted_basic";
-  private static final String COMPLEX_TABLE = "junit_unparted_complex";
-  private static final String PARTITIONED_TABLE = "junit_parted_basic";
-  private static final String SPECIFIC_SIZE_TABLE = "junit_specific_size";
-  private static Driver driver;
-
-  private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass
-  private static boolean setupHasRun = false;
-
-  private static Map<Integer,Pair<Integer,String>> basicInputData;
-
-  private void dropTable(String tablename) throws IOException, CommandNeedRetryException{
-    driver.run("drop table "+tablename);
-  }
-  private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException{
-    String createTable;
-    createTable = "create table "+tablename+"("+schema+") ";
-    if ((partitionedBy != null)&&(!partitionedBy.trim().isEmpty())){
-      createTable = createTable + "partitioned by ("+partitionedBy+") ";
-    }
-    createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
-        "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
-    int retCode = driver.run(createTable).getResponseCode();
-    if(retCode != 0) {
-      throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+"]");
-    }
-  }
-
-  private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException{
-    createTable(tablename,schema,null);
-  }
-
-  protected void guardedSetUpBeforeClass() throws Exception {
-    if (!setupHasRun){
-      setupHasRun = true;
-    }else{
-      return;
-    }
-
-    File f = new File(TEST_WAREHOUSE_DIR);
-    if (f.exists()) {
-      FileUtil.fullyDelete(f);
-    }
-    new File(TEST_WAREHOUSE_DIR).mkdirs();
-
-    HiveConf hiveConf = new HiveConf(this.getClass());
-    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
-    driver = new Driver(hiveConf);
-    SessionState.start(new CliSessionState(hiveConf));
-
-    cleanup();
-
-    createTable(BASIC_TABLE,"a int, b string");
-    createTable(COMPLEX_TABLE,
-        "name string, studentid int, "
-        + "contact struct<phno:string,email:string>, "
-        + "currently_registered_courses array<string>, "
-        + "current_grades map<string,string>, "
-        + "phnos array<struct<phno:string,type:string>>");
-
-    createTable(PARTITIONED_TABLE,"a int, b string","bkt string");
-    createTable(SPECIFIC_SIZE_TABLE, "a int, b string");
-
-    int LOOP_SIZE = 3;
-    String[] input = new String[LOOP_SIZE*LOOP_SIZE];
-    basicInputData = new HashMap<Integer,Pair<Integer,String>>();
-    int k = 0;
-    for(int i = 1; i <= LOOP_SIZE; i++) {
-      String si = i + "";
-      for(int j=1;j<=LOOP_SIZE;j++) {
-        String sj = "S"+j+"S";
-        input[k] = si + "\t" + sj;
-        basicInputData.put(k, new Pair<Integer,String>(i,sj));
-        k++;
-      }
-    }
-    HcatTestUtils.createTestDataFile(BASIC_FILE_NAME, input);
-    HcatTestUtils.createTestDataFile(COMPLEX_FILE_NAME,
-        new String[]{
-            //"Henry Jekyll\t42\t(415-253-6367,hjekyll@contemporary.edu.uk)\t{(PHARMACOLOGY),(PSYCHIATRY)},[PHARMACOLOGY#A-,PSYCHIATRY#B+],{(415-253-6367,cell),(408-253-6367,landline)}",
-            //"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}",
-        }
-    );
-
-    PigServer server = new PigServer(ExecType.LOCAL);
-    server.setBatchOn();
-    server.registerQuery("A = load '"+BASIC_FILE_NAME+"' as (a:int, b:chararray);");
-
-    server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
-    server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
-    server.registerQuery("B = foreach A generate a,b;");
-    server.registerQuery("B2 = filter B by a < 2;");
-    server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
-
-    server.registerQuery("C = foreach A generate a,b;");
-    server.registerQuery("C2 = filter C by a >= 2;");
-    server.registerQuery("store C2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
-
-    server.registerQuery("D = load '"+COMPLEX_FILE_NAME+"' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});");
-    server.registerQuery("store D into '"+COMPLEX_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
-    server.executeBatch();
-
-  }
-  private void cleanup() throws IOException, CommandNeedRetryException {
-    dropTable(BASIC_TABLE);
-    dropTable(COMPLEX_TABLE);
-    dropTable(PARTITIONED_TABLE);
-    dropTable(SPECIFIC_SIZE_TABLE);
-  }
-
-  protected void guardedTearDownAfterClass() throws Exception {
-    guardTestCount--;
-    if (guardTestCount > 0){
-      return;
-    }
-    cleanup();
-  }
-
-  @Override
-  protected void setUp() throws Exception {
-    guardedSetUpBeforeClass();
-  }
-
-  @Override
-  protected void tearDown() throws Exception {
-    guardedTearDownAfterClass();
-  }
-
-  public void testSchemaLoadBasic() throws IOException{
-
-    PigServer server = new PigServer(ExecType.LOCAL);
-
-    // test that schema was loaded correctly
-    server.registerQuery("X = load '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
-    Schema dumpedXSchema = server.dumpSchema("X");
-    List<FieldSchema> Xfields = dumpedXSchema.getFields();
-    assertEquals(2,Xfields.size());
-    assertTrue(Xfields.get(0).alias.equalsIgnoreCase("a"));
-    assertTrue(Xfields.get(0).type == DataType.INTEGER);
-    assertTrue(Xfields.get(1).alias.equalsIgnoreCase("b"));
-    assertTrue(Xfields.get(1).type == DataType.CHARARRAY);
-
-  }
-
-  public void testReadDataBasic() throws IOException {
-    PigServer server = new PigServer(ExecType.LOCAL);
-
-    server.registerQuery("X = load '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
-    Iterator<Tuple> XIter = server.openIterator("X");
-    int numTuplesRead = 0;
-    while( XIter.hasNext() ){
-      Tuple t = XIter.next();
-      assertEquals(2,t.size());
-      assertTrue(t.get(0).getClass() == Integer.class);
-      assertTrue(t.get(1).getClass() == String.class);
-      assertEquals(t.get(0),basicInputData.get(numTuplesRead).first);
-      assertEquals(t.get(1),basicInputData.get(numTuplesRead).second);
-      numTuplesRead++;
-    }
-    assertEquals(basicInputData.size(),numTuplesRead);
-  }
-
-  public void testSchemaLoadComplex() throws IOException{
-
-    PigServer server = new PigServer(ExecType.LOCAL);
-
-    // test that schema was loaded correctly
-    server.registerQuery("K = load '"+COMPLEX_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
-    Schema dumpedKSchema = server.dumpSchema("K");
-    List<FieldSchema> Kfields = dumpedKSchema.getFields();
-    assertEquals(6, Kfields.size());
-
-    assertEquals(DataType.CHARARRAY,Kfields.get(0).type);
-    assertEquals("name",Kfields.get(0).alias.toLowerCase());
-
-    assertEquals( DataType.INTEGER,Kfields.get(1).type);
-    assertEquals("studentid", Kfields.get(1).alias.toLowerCase());
-
-    assertEquals(DataType.TUPLE, Kfields.get(2).type);
-    assertEquals("contact", Kfields.get(2).alias.toLowerCase());
-    {
-      assertNotNull(Kfields.get(2).schema);
-      assertTrue(Kfields.get(2).schema.getFields().size() == 2);
-      assertTrue(Kfields.get(2).schema.getFields().get(0).type == DataType.CHARARRAY);
-      assertTrue(Kfields.get(2).schema.getFields().get(0).alias.equalsIgnoreCase("phno"));
-      assertTrue(Kfields.get(2).schema.getFields().get(1).type == DataType.CHARARRAY);
-      assertTrue(Kfields.get(2).schema.getFields().get(1).alias.equalsIgnoreCase("email"));
-    }
-    assertEquals(DataType.BAG, Kfields.get(3).type);
-    assertEquals("currently_registered_courses", Kfields.get(3).alias.toLowerCase());
-    {
-      assertNotNull(Kfields.get(3).schema);
-      assertEquals(1,Kfields.get(3).schema.getFields().size());
-      assertEquals(DataType.TUPLE,Kfields.get(3).schema.getFields().get(0).type);
-      assertNotNull(Kfields.get(3).schema.getFields().get(0).schema);
-      assertEquals(1,Kfields.get(3).schema.getFields().get(0).schema.getFields().size());
-      assertEquals(DataType.CHARARRAY,Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).type);
-      // assertEquals("course",Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
-      // commented out, because the name becomes "innerfield" by default - we call it "course" in pig,
-      // but in the metadata, it'd be anonymous, so this would be autogenerated, which is fine
-    }
-    assertEquals(DataType.MAP,Kfields.get(4).type);
-    assertEquals("current_grades",Kfields.get(4).alias.toLowerCase());
-    assertEquals(DataType.BAG,Kfields.get(5).type);
-    assertEquals("phnos", Kfields.get(5).alias.toLowerCase());
-    {
-      assertNotNull(Kfields.get(5).schema);
-      assertEquals(1,Kfields.get(5).schema.getFields().size());
-      assertEquals(DataType.TUPLE,Kfields.get(5).schema.getFields().get(0).type);
-      assertNotNull(Kfields.get(5).schema.getFields().get(0).schema);
-      assertTrue(Kfields.get(5).schema.getFields().get(0).schema.getFields().size() == 2);
-      assertEquals(DataType.CHARARRAY,Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).type);
-      assertEquals("phno",Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
-      assertEquals(DataType.CHARARRAY,Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).type);
-      assertEquals("type",Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).alias.toLowerCase());
-    }
-
-  }
-
-  public void testReadPartitionedBasic() throws IOException, CommandNeedRetryException {
-    PigServer server = new PigServer(ExecType.LOCAL);
-
-    driver.run("select * from "+PARTITIONED_TABLE);
-    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
-    driver.getResults(valuesReadFromHiveDriver);
-    assertEquals(basicInputData.size(), valuesReadFromHiveDriver.size());
-
-    server.registerQuery("W = load '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
-    Schema dumpedWSchema = server.dumpSchema("W");
-    List<FieldSchema> Wfields = dumpedWSchema.getFields();
-    assertEquals(3,Wfields.size());
-    assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
-    assertTrue(Wfields.get(0).type == DataType.INTEGER);
-    assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
-    assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
-    assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
-    assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
-
-    Iterator<Tuple> WIter = server.openIterator("W");
-    Collection<Pair<Integer,String>> valuesRead = new ArrayList<Pair<Integer,String>>();
-    while( WIter.hasNext() ){
-      Tuple t = WIter.next();
-      assertTrue(t.size() == 3);
-      assertTrue(t.get(0).getClass() == Integer.class);
-      assertTrue(t.get(1).getClass() == String.class);
-      assertTrue(t.get(2).getClass() == String.class);
-      valuesRead.add(new Pair<Integer,String>((Integer)t.get(0),(String)t.get(1)));
-      if ((Integer)t.get(0) < 2){
-        assertEquals("0",t.get(2));
-      }else{
-        assertEquals("1",t.get(2));
-      }
-    }
-    assertEquals(valuesReadFromHiveDriver.size(),valuesRead.size());
-
-    server.registerQuery("P1 = load '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
-    server.registerQuery("P1filter = filter P1 by bkt == '0';");
-    Iterator<Tuple> P1Iter = server.openIterator("P1filter");
-    int count1 = 0;
-    while( P1Iter.hasNext() ) {
-      Tuple t = P1Iter.next();
-
-      assertEquals("0", t.get(2));
-      assertEquals(1, t.get(0));
-      count1++;
-    }
-    assertEquals(3, count1);
-
-    server.registerQuery("P2 = load '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
-    server.registerQuery("P2filter = filter P2 by bkt == '1';");
-    Iterator<Tuple> P2Iter = server.openIterator("P2filter");
-    int count2 = 0;
-    while( P2Iter.hasNext() ) {
-      Tuple t = P2Iter.next();
-
-      assertEquals("1", t.get(2));
-      assertTrue(((Integer) t.get(0)) > 1);
-      count2++;
-    }
-    assertEquals(6, count2);
-  }
-
-  public void testProjectionsBasic() throws IOException {
-
-    PigServer server = new PigServer(ExecType.LOCAL);
-
-    // projections are handled by using generate, not "as" on the Load
-
-    server.registerQuery("Y1 = load '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatLoader();");
-    server.registerQuery("Y2 = foreach Y1 generate a;");
-    server.registerQuery("Y3 = foreach Y1 generate b,a;");
-    Schema dumpedY2Schema = server.dumpSchema("Y2");
-    Schema dumpedY3Schema = server.dumpSchema("Y3");
-    List<FieldSchema> Y2fields = dumpedY2Schema.getFields();
-    List<FieldSchema> Y3fields = dumpedY3Schema.getFields();
-    assertEquals(1,Y2fields.size());
-    assertEquals("a",Y2fields.get(0).alias.toLowerCase());
-    assertEquals(DataType.INTEGER,Y2fields.get(0).type);
-    assertEquals(2,Y3fields.size());
-    assertEquals("b",Y3fields.get(0).alias.toLowerCase());
-    assertEquals(DataType.CHARARRAY,Y3fields.get(0).type);
-    assertEquals("a",Y3fields.get(1).alias.toLowerCase());
-    assertEquals(DataType.INTEGER,Y3fields.get(1).type);
-
-    int numTuplesRead = 0;
-    Iterator<Tuple> Y2Iter = server.openIterator("Y2");
-    while( Y2Iter.hasNext() ){
-      Tuple t = Y2Iter.next();
-      assertEquals(t.size(),1);
-      assertTrue(t.get(0).getClass() == Integer.class);
-      assertEquals(t.get(0),basicInputData.get(numTuplesRead).first);
-      numTuplesRead++;
-    }
-    numTuplesRead = 0;
-    Iterator<Tuple> Y3Iter = server.openIterator("Y3");
-    while( Y3Iter.hasNext() ){
-      Tuple t = Y3Iter.next();
-      assertEquals(t.size(),2);
-      assertTrue(t.get(0).getClass() == String.class);
-      assertEquals(t.get(0),basicInputData.get(numTuplesRead).second);
-      assertTrue(t.get(1).getClass() == Integer.class);
-      assertEquals(t.get(1),basicInputData.get(numTuplesRead).first);
-      numTuplesRead++;
-    }
-    assertEquals(basicInputData.size(),numTuplesRead);
-  }
-
-  public void testGetInputBytes() throws Exception {
-    File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000");
-    file.deleteOnExit();
-    RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
-    randomAccessFile.setLength(2L * 1024 * 1024 * 1024);
-
-    Job job = new Job();
-    HCatLoader hCatLoader = new HCatLoader();
-    hCatLoader.setUDFContextSignature(this.getName());
-    hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job);
-    ResourceStatistics statistics = hCatLoader.getStatistics(file.getAbsolutePath(), job);
-    assertEquals(2048, (long) statistics.getmBytes());
-  }
-
-  public void testConvertBooleanToInt() throws Exception {
-    String tbl = "test_convert_boolean_to_int";
-    String inputFileName = TEST_DATA_DIR + "/testConvertBooleanToInt/data.txt";
-    File inputDataDir = new File(inputFileName).getParentFile();
-    inputDataDir.mkdir();
-
-    String[] lines = new String[] {"llama\t1", "alpaca\t0"};
-    HcatTestUtils.createTestDataFile(inputFileName, lines);
-
-    assertEquals(0, driver.run("drop table if exists " + tbl).getResponseCode());
-    assertEquals(0, driver.run("create external table " + tbl +
-        " (a string, b boolean) row format delimited fields terminated by '\t'" +
-        " stored as textfile location 'file://" +
-        inputDataDir.getAbsolutePath() + "'").getResponseCode());
-
-    Properties properties = new Properties();
-    properties.setProperty(HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, "true");
-    PigServer server = new PigServer(ExecType.LOCAL, properties);
-    server.registerQuery(
-        "data = load 'test_convert_boolean_to_int' using org.apache.hcatalog.pig.HCatLoader();");
-    Schema schema = server.dumpSchema("data");
-    assertEquals(2, schema.getFields().size());
-
-    assertEquals("a", schema.getField(0).alias);
-    assertEquals(DataType.CHARARRAY, schema.getField(0).type);
-    assertEquals("b", schema.getField(1).alias);
-    assertEquals(DataType.INTEGER, schema.getField(1).type);
-
-    Iterator<Tuple> iterator = server.openIterator("data");
-    Tuple t = iterator.next();
-    assertEquals("llama", t.get(0));
-    // TODO: Figure out how to load a text file into Hive with boolean columns. This next assert
-    // passes because data was loaded as integers, not because it was converted.
-    assertEquals(1, t.get(1));
-    t = iterator.next();
-    assertEquals("alpaca", t.get(0));
-    assertEquals(0, t.get(1));
-    assertFalse(iterator.hasNext());
-  }
+    private static final String TEST_DATA_DIR = System.getProperty("user.dir") +
+        "/build/test/data/" + TestHCatLoader.class.getCanonicalName();
+    private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+    private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
+    private static final String COMPLEX_FILE_NAME = TEST_DATA_DIR + "/complex.input.data";
+
+    private static final String BASIC_TABLE = "junit_unparted_basic";
+    private static final String COMPLEX_TABLE = "junit_unparted_complex";
+    private static final String PARTITIONED_TABLE = "junit_parted_basic";
+    private static final String SPECIFIC_SIZE_TABLE = "junit_specific_size";
+    private static Driver driver;
+
+    private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass
+    private static boolean setupHasRun = false;
+
+    private static Map<Integer, Pair<Integer, String>> basicInputData;
+
+    private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
+        driver.run("drop table " + tablename);
+    }
+
+    private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
+        String createTable;
+        createTable = "create table " + tablename + "(" + schema + ") ";
+        if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
+            createTable = createTable + "partitioned by (" + partitionedBy + ") ";
+        }
+        createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+            "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+        int retCode = driver.run(createTable).getResponseCode();
+        if (retCode != 0) {
+            throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + "]");
+        }
+    }
+
+    private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
+        createTable(tablename, schema, null);
+    }
+
+    protected void guardedSetUpBeforeClass() throws Exception {
+        if (!setupHasRun) {
+            setupHasRun = true;
+        } else {
+            return;
+        }
+
+        File f = new File(TEST_WAREHOUSE_DIR);
+        if (f.exists()) {
+            FileUtil.fullyDelete(f);
+        }
+        new File(TEST_WAREHOUSE_DIR).mkdirs();
+
+        HiveConf hiveConf = new HiveConf(this.getClass());
+        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
+        driver = new Driver(hiveConf);
+        SessionState.start(new CliSessionState(hiveConf));
+
+        cleanup();
+
+        createTable(BASIC_TABLE, "a int, b string");
+        createTable(COMPLEX_TABLE,
+            "name string, studentid int, "
+                + "contact struct<phno:string,email:string>, "
+                + "currently_registered_courses array<string>, "
+                + "current_grades map<string,string>, "
+                + "phnos array<struct<phno:string,type:string>>");
+
+        createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");
+        createTable(SPECIFIC_SIZE_TABLE, "a int, b string");
+
+        int LOOP_SIZE = 3;
+        String[] input = new String[LOOP_SIZE * LOOP_SIZE];
+        basicInputData = new HashMap<Integer, Pair<Integer, String>>();
+        int k = 0;
+        for (int i = 1; i <= LOOP_SIZE; i++) {
+            String si = i + "";
+            for (int j = 1; j <= LOOP_SIZE; j++) {
+                String sj = "S" + j + "S";
+                input[k] = si + "\t" + sj;
+                basicInputData.put(k, new Pair<Integer, String>(i, sj));
+                k++;
+            }
+        }
+        HcatTestUtils.createTestDataFile(BASIC_FILE_NAME, input);
+        HcatTestUtils.createTestDataFile(COMPLEX_FILE_NAME,
+            new String[]{
+                //"Henry Jekyll\t42\t(415-253-6367,hjekyll@contemporary.edu.uk)\t{(PHARMACOLOGY),(PSYCHIATRY)},[PHARMACOLOGY#A-,PSYCHIATRY#B+],{(415-253-6367,cell),(408-253-6367,landline)}",
+                //"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}",
+            }
+        );
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);");
+
+        server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
+        server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
+        server.registerQuery("B = foreach A generate a,b;");
+        server.registerQuery("B2 = filter B by a < 2;");
+        server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
+
+        server.registerQuery("C = foreach A generate a,b;");
+        server.registerQuery("C2 = filter C by a >= 2;");
+        server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatStorer('bkt=1');");
+
+        server.registerQuery("D = load '" + COMPLEX_FILE_NAME + "' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});");
+        server.registerQuery("store D into '" + COMPLEX_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
+        server.executeBatch();
+
+    }
+
+    private void cleanup() throws IOException, CommandNeedRetryException {
+        dropTable(BASIC_TABLE);
+        dropTable(COMPLEX_TABLE);
+        dropTable(PARTITIONED_TABLE);
+        dropTable(SPECIFIC_SIZE_TABLE);
+    }
+
+    protected void guardedTearDownAfterClass() throws Exception {
+        guardTestCount--;
+        if (guardTestCount > 0) {
+            return;
+        }
+        cleanup();
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        guardedSetUpBeforeClass();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        guardedTearDownAfterClass();
+    }
+
+    public void testSchemaLoadBasic() throws IOException {
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        // test that schema was loaded correctly
+        server.registerQuery("X = load '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        Schema dumpedXSchema = server.dumpSchema("X");
+        List<FieldSchema> Xfields = dumpedXSchema.getFields();
+        assertEquals(2, Xfields.size());
+        assertTrue(Xfields.get(0).alias.equalsIgnoreCase("a"));
+        assertTrue(Xfields.get(0).type == DataType.INTEGER);
+        assertTrue(Xfields.get(1).alias.equalsIgnoreCase("b"));
+        assertTrue(Xfields.get(1).type == DataType.CHARARRAY);
+
+    }
+
+    public void testReadDataBasic() throws IOException {
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        server.registerQuery("X = load '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        Iterator<Tuple> XIter = server.openIterator("X");
+        int numTuplesRead = 0;
+        while (XIter.hasNext()) {
+            Tuple t = XIter.next();
+            assertEquals(2, t.size());
+            assertTrue(t.get(0).getClass() == Integer.class);
+            assertTrue(t.get(1).getClass() == String.class);
+            assertEquals(t.get(0), basicInputData.get(numTuplesRead).first);
+            assertEquals(t.get(1), basicInputData.get(numTuplesRead).second);
+            numTuplesRead++;
+        }
+        assertEquals(basicInputData.size(), numTuplesRead);
+    }
+
+    public void testSchemaLoadComplex() throws IOException {
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        // test that schema was loaded correctly
+        server.registerQuery("K = load '" + COMPLEX_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        Schema dumpedKSchema = server.dumpSchema("K");
+        List<FieldSchema> Kfields = dumpedKSchema.getFields();
+        assertEquals(6, Kfields.size());
+
+        assertEquals(DataType.CHARARRAY, Kfields.get(0).type);
+        assertEquals("name", Kfields.get(0).alias.toLowerCase());
+
+        assertEquals(DataType.INTEGER, Kfields.get(1).type);
+        assertEquals("studentid", Kfields.get(1).alias.toLowerCase());
+
+        assertEquals(DataType.TUPLE, Kfields.get(2).type);
+        assertEquals("contact", Kfields.get(2).alias.toLowerCase());
+        {
+            assertNotNull(Kfields.get(2).schema);
+            assertTrue(Kfields.get(2).schema.getFields().size() == 2);
+            assertTrue(Kfields.get(2).schema.getFields().get(0).type == DataType.CHARARRAY);
+            assertTrue(Kfields.get(2).schema.getFields().get(0).alias.equalsIgnoreCase("phno"));
+            assertTrue(Kfields.get(2).schema.getFields().get(1).type == DataType.CHARARRAY);
+            assertTrue(Kfields.get(2).schema.getFields().get(1).alias.equalsIgnoreCase("email"));
+        }
+        assertEquals(DataType.BAG, Kfields.get(3).type);
+        assertEquals("currently_registered_courses", Kfields.get(3).alias.toLowerCase());
+        {
+            assertNotNull(Kfields.get(3).schema);
+            assertEquals(1, Kfields.get(3).schema.getFields().size());
+            assertEquals(DataType.TUPLE, Kfields.get(3).schema.getFields().get(0).type);
+            assertNotNull(Kfields.get(3).schema.getFields().get(0).schema);
+            assertEquals(1, Kfields.get(3).schema.getFields().get(0).schema.getFields().size());
+            assertEquals(DataType.CHARARRAY, Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).type);
+            // assertEquals("course",Kfields.get(3).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+            // commented out, because the name becomes "innerfield" by default - we call it "course" in pig,
+            // but in the metadata, it'd be anonymous, so this would be autogenerated, which is fine
+        }
+        assertEquals(DataType.MAP, Kfields.get(4).type);
+        assertEquals("current_grades", Kfields.get(4).alias.toLowerCase());
+        assertEquals(DataType.BAG, Kfields.get(5).type);
+        assertEquals("phnos", Kfields.get(5).alias.toLowerCase());
+        {
+            assertNotNull(Kfields.get(5).schema);
+            assertEquals(1, Kfields.get(5).schema.getFields().size());
+            assertEquals(DataType.TUPLE, Kfields.get(5).schema.getFields().get(0).type);
+            assertNotNull(Kfields.get(5).schema.getFields().get(0).schema);
+            assertTrue(Kfields.get(5).schema.getFields().get(0).schema.getFields().size() == 2);
+            assertEquals(DataType.CHARARRAY, Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).type);
+            assertEquals("phno", Kfields.get(5).schema.getFields().get(0).schema.getFields().get(0).alias.toLowerCase());
+            assertEquals(DataType.CHARARRAY, Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).type);
+            assertEquals("type", Kfields.get(5).schema.getFields().get(0).schema.getFields().get(1).alias.toLowerCase());
+        }
+
+    }
+
+    public void testReadPartitionedBasic() throws IOException, CommandNeedRetryException {
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        driver.run("select * from " + PARTITIONED_TABLE);
+        ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+        driver.getResults(valuesReadFromHiveDriver);
+        assertEquals(basicInputData.size(), valuesReadFromHiveDriver.size());
+
+        server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        Schema dumpedWSchema = server.dumpSchema("W");
+        List<FieldSchema> Wfields = dumpedWSchema.getFields();
+        assertEquals(3, Wfields.size());
+        assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
+        assertTrue(Wfields.get(0).type == DataType.INTEGER);
+        assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
+        assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
+        assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
+        assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
+
+        Iterator<Tuple> WIter = server.openIterator("W");
+        Collection<Pair<Integer, String>> valuesRead = new ArrayList<Pair<Integer, String>>();
+        while (WIter.hasNext()) {
+            Tuple t = WIter.next();
+            assertTrue(t.size() == 3);
+            assertTrue(t.get(0).getClass() == Integer.class);
+            assertTrue(t.get(1).getClass() == String.class);
+            assertTrue(t.get(2).getClass() == String.class);
+            valuesRead.add(new Pair<Integer, String>((Integer) t.get(0), (String) t.get(1)));
+            if ((Integer) t.get(0) < 2) {
+                assertEquals("0", t.get(2));
+            } else {
+                assertEquals("1", t.get(2));
+            }
+        }
+        assertEquals(valuesReadFromHiveDriver.size(), valuesRead.size());
+
+        server.registerQuery("P1 = load '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        server.registerQuery("P1filter = filter P1 by bkt == '0';");
+        Iterator<Tuple> P1Iter = server.openIterator("P1filter");
+        int count1 = 0;
+        while (P1Iter.hasNext()) {
+            Tuple t = P1Iter.next();
+
+            assertEquals("0", t.get(2));
+            assertEquals(1, t.get(0));
+            count1++;
+        }
+        assertEquals(3, count1);
+
+        server.registerQuery("P2 = load '" + PARTITIONED_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        server.registerQuery("P2filter = filter P2 by bkt == '1';");
+        Iterator<Tuple> P2Iter = server.openIterator("P2filter");
+        int count2 = 0;
+        while (P2Iter.hasNext()) {
+            Tuple t = P2Iter.next();
+
+            assertEquals("1", t.get(2));
+            assertTrue(((Integer) t.get(0)) > 1);
+            count2++;
+        }
+        assertEquals(6, count2);
+    }
+
+    public void testProjectionsBasic() throws IOException {
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+
+        // projections are handled by using generate, not "as" on the Load
+
+        server.registerQuery("Y1 = load '" + BASIC_TABLE + "' using org.apache.hcatalog.pig.HCatLoader();");
+        server.registerQuery("Y2 = foreach Y1 generate a;");
+        server.registerQuery("Y3 = foreach Y1 generate b,a;");
+        Schema dumpedY2Schema = server.dumpSchema("Y2");
+        Schema dumpedY3Schema = server.dumpSchema("Y3");
+        List<FieldSchema> Y2fields = dumpedY2Schema.getFields();
+        List<FieldSchema> Y3fields = dumpedY3Schema.getFields();
+        assertEquals(1, Y2fields.size());
+        assertEquals("a", Y2fields.get(0).alias.toLowerCase());
+        assertEquals(DataType.INTEGER, Y2fields.get(0).type);
+        assertEquals(2, Y3fields.size());
+        assertEquals("b", Y3fields.get(0).alias.toLowerCase());
+        assertEquals(DataType.CHARARRAY, Y3fields.get(0).type);
+        assertEquals("a", Y3fields.get(1).alias.toLowerCase());
+        assertEquals(DataType.INTEGER, Y3fields.get(1).type);
+
+        int numTuplesRead = 0;
+        Iterator<Tuple> Y2Iter = server.openIterator("Y2");
+        while (Y2Iter.hasNext()) {
+            Tuple t = Y2Iter.next();
+            assertEquals(t.size(), 1);
+            assertTrue(t.get(0).getClass() == Integer.class);
+            assertEquals(t.get(0), basicInputData.get(numTuplesRead).first);
+            numTuplesRead++;
+        }
+        numTuplesRead = 0;
+        Iterator<Tuple> Y3Iter = server.openIterator("Y3");
+        while (Y3Iter.hasNext()) {
+            Tuple t = Y3Iter.next();
+            assertEquals(t.size(), 2);
+            assertTrue(t.get(0).getClass() == String.class);
+            assertEquals(t.get(0), basicInputData.get(numTuplesRead).second);
+            assertTrue(t.get(1).getClass() == Integer.class);
+            assertEquals(t.get(1), basicInputData.get(numTuplesRead).first);
+            numTuplesRead++;
+        }
+        assertEquals(basicInputData.size(), numTuplesRead);
+    }
+
+    public void testGetInputBytes() throws Exception {
+        File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000");
+        file.deleteOnExit();
+        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+        randomAccessFile.setLength(2L * 1024 * 1024 * 1024);
+
+        Job job = new Job();
+        HCatLoader hCatLoader = new HCatLoader();
+        hCatLoader.setUDFContextSignature(this.getName());
+        hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job);
+        ResourceStatistics statistics = hCatLoader.getStatistics(file.getAbsolutePath(), job);
+        assertEquals(2048, (long) statistics.getmBytes());
+    }
+
+    public void testConvertBooleanToInt() throws Exception {
+        String tbl = "test_convert_boolean_to_int";
+        String inputFileName = TEST_DATA_DIR + "/testConvertBooleanToInt/data.txt";
+        File inputDataDir = new File(inputFileName).getParentFile();
+        inputDataDir.mkdir();
+
+        String[] lines = new String[]{"llama\t1", "alpaca\t0"};
+        HcatTestUtils.createTestDataFile(inputFileName, lines);
+
+        assertEquals(0, driver.run("drop table if exists " + tbl).getResponseCode());
+        assertEquals(0, driver.run("create external table " + tbl +
+            " (a string, b boolean) row format delimited fields terminated by '\t'" +
+            " stored as textfile location 'file://" +
+            inputDataDir.getAbsolutePath() + "'").getResponseCode());
+
+        Properties properties = new Properties();
+        properties.setProperty(HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, "true");
+        PigServer server = new PigServer(ExecType.LOCAL, properties);
+        server.registerQuery(
+            "data = load 'test_convert_boolean_to_int' using org.apache.hcatalog.pig.HCatLoader();");
+        Schema schema = server.dumpSchema("data");
+        assertEquals(2, schema.getFields().size());
+
+        assertEquals("a", schema.getField(0).alias);
+        assertEquals(DataType.CHARARRAY, schema.getField(0).type);
+        assertEquals("b", schema.getField(1).alias);
+        assertEquals(DataType.INTEGER, schema.getField(1).type);
+
+        Iterator<Tuple> iterator = server.openIterator("data");
+        Tuple t = iterator.next();
+        assertEquals("llama", t.get(0));
+        // TODO: Figure out how to load a text file into Hive with boolean columns. This next assert
+        // passes because data was loaded as integers, not because it was converted.
+        assertEquals(1, t.get(1));
+        t = iterator.next();
+        assertEquals("alpaca", t.get(0));
+        assertEquals(0, t.get(1));
+        assertFalse(iterator.hasNext());
+    }
 }

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java Mon Sep 10 23:28:55 2012
@@ -50,239 +50,252 @@ import org.slf4j.LoggerFactory;
 
 public class TestHCatLoaderComplexSchema {
 
-  //private static MiniCluster cluster = MiniCluster.buildCluster();
-  private static Driver driver;
-  //private static Properties props;
-  private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderComplexSchema.class);
-  private void dropTable(String tablename) throws IOException, CommandNeedRetryException{
-    driver.run("drop table "+tablename);
-  }
-
-  private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException{
-    String createTable;
-    createTable = "create table "+tablename+"("+schema+") ";
-    if ((partitionedBy != null)&&(!partitionedBy.trim().isEmpty())){
-      createTable = createTable + "partitioned by ("+partitionedBy+") ";
-    }
-    createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
-        "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
-    LOG.info("Creating table:\n {}", createTable);
-    CommandProcessorResponse result = driver.run(createTable);
-    int retCode = result.getResponseCode();
-    if(retCode != 0) {
-      throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+" "+result.getErrorMessage()+"]");
-    }
-  }
-
-  private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException{
-    createTable(tablename,schema,null);
-  }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-
-    HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class  );
-    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-    driver = new Driver(hiveConf);
-    SessionState.start(new CliSessionState(hiveConf));
-    //props = new Properties();
-    //props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
-
-  }
-
-  private static final TupleFactory tf = TupleFactory.getInstance();
-  private static final BagFactory bf = BagFactory.getInstance();
-  private Tuple t(Object... objects) {
-    return tf.newTuple(Arrays.asList(objects));
-  }
-  private DataBag b(Tuple... objects) {
-    return bf.newDefaultBag(Arrays.asList(objects));
-  }
-
-  /**
-   * artificially complex nested schema to test nested schema conversion
-   * @throws Exception
-   */
-  @Test
-  public void testSyntheticComplexSchema() throws Exception {
-    String pigSchema =
-        "(" +
-          "a: " +
-            "(" +
-              "aa: chararray, " +
-              "ab: long, " +
-              "ac: map[], " +
-              "ad: { t: (ada: long) }, " +
-              "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," +
-              "af: (afa: chararray, afb: long) " +
-            ")," +
-           "b: chararray, " +
-           "c: long, " +
-           "d:  { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " +
-         ")";
-
-    // with extra structs
-    String tableSchema =
-        "a struct<" +
-            "aa: string, " +
-            "ab: bigint, " +
-            "ac: map<string, string>, " +
-            "ad: array<struct<ada:bigint>>, " +
-            "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
-            "af: struct<afa: string, afb: bigint> " +
-            ">, " +
-        "b string, " +
-        "c bigint, " +
-        "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<struct<dca: bigint>>>>";
-
-    // without extra structs
-    String tableSchema2 =
-        "a struct<" +
-            "aa: string, " +
-            "ab: bigint, " +
-            "ac: map<string, string>, " +
-            "ad: array<bigint>, " +
-            "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
-            "af: struct<afa: string, afb: bigint> " +
-            ">, " +
-        "b string, " +
-        "c bigint, " +
-        "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<bigint>>>";
-
-    List<Tuple> data = new ArrayList<Tuple>();
-    for (int i = 0; i < 10; i++) {
-      Tuple t = t(
-          t(
-              "aa test",
-              2l,
-              new HashMap<String, String>() {{put("ac test1", "test 1");put("ac test2", "test 2");}},
-              b(t(3l), t(4l)),
-              b(t(5l, t("aeba test", 6l))),
-              t("afa test", 7l)
-          ),
-          "b test",
-          (long)i,
-          b(t(8l, t("dba test", 9l), b(t(10l)))));
-
-      data.add(t);
-    }
-    verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, true);
-    verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, false);
-    verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, true);
-    verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, false);
-
-  }
-
-  private void verifyWriteRead(String tablename, String pigSchema, String tableSchema, List<Tuple> data, boolean provideSchemaToStorer)
-      throws IOException, CommandNeedRetryException, ExecException, FrontendException {
-    MockLoader.setData(tablename+"Input", data);
-    try {
-      createTable(tablename, tableSchema);
-      PigServer server = new PigServer(ExecType.LOCAL);
-      server.setBatchOn();
-      server.registerQuery("A = load '"+tablename+"Input' using org.apache.hcatalog.pig.MockLoader() AS "+pigSchema+";");
-      Schema dumpedASchema = server.dumpSchema("A");
-      server.registerQuery("STORE A into '"+tablename+"' using org.apache.hcatalog.pig.HCatStorer("
-          + (provideSchemaToStorer ? "'', '"+pigSchema+"'" : "")
-          + ");");
-
-      ExecJob execJob = server.executeBatch().get(0);
-      if (!execJob.getStatistics().isSuccessful()) {
-        throw new RuntimeException("Import failed", execJob.getException());
-      }
-      // test that schema was loaded correctly
-      server.registerQuery("X = load '"+tablename+"' using org.apache.hcatalog.pig.HCatLoader();");
-      server.dumpSchema("X");
-      Iterator<Tuple> it = server.openIterator("X");
-      int i = 0;
-      while (it.hasNext()) {
-        Tuple input = data.get(i++);
-        Tuple output = it.next();
-        Assert.assertEquals(input.toString(), output.toString());
-        LOG.info("tuple : {} ",output);
-      }
-      Schema dumpedXSchema = server.dumpSchema("X");
-
-      Assert.assertEquals(
-          "expected " + dumpedASchema + " but was " + dumpedXSchema + " (ignoring field names)",
-          "",
-          compareIgnoreFiledNames(dumpedASchema, dumpedXSchema));
-
-    } finally {
-      dropTable(tablename);
-    }
-  }
-
-  private String compareIgnoreFiledNames(Schema expected, Schema got) throws FrontendException {
-    if (expected == null || got == null) {
-      if (expected == got) {
-        return "";
-      } else {
-        return "\nexpected "+expected+" got "+got;
-      }
-    }
-    if (expected.size() != got.size()) {
-      return "\nsize expected "+expected.size()+" ("+expected+") got "+got.size()+" ("+got+")";
-    }
-    String message = "";
-    for (int i = 0; i < expected.size(); i++) {
-      FieldSchema expectedField = expected.getField(i);
-      FieldSchema gotField = got.getField(i);
-      if (expectedField.type != gotField.type) {
-        message += "\ntype expected "+expectedField.type+" ("+expectedField+") got "+gotField.type+" ("+gotField+")";
-      } else {
-        message += compareIgnoreFiledNames(expectedField.schema, gotField.schema);
-      }
-    }
-    return message;
-  }
-
-  /**
-   * tests that unnecessary tuples are drop while converting schema
-   * (Pig requires Tuples in Bags)
-   * @throws Exception
-   */
-  @Test
-  public void testTupleInBagInTupleInBag() throws Exception {
-    String pigSchema = "(a: { b : ( c: { d: (i : long) } ) })";
-
-    String tableSchema = "a array< array< bigint > >";
-
-    List<Tuple> data = new ArrayList<Tuple>();
-    data.add(t(b(t(b(t(100l),t(101l))), t(b(t(110l))))));
-    data.add(t(b(t(b(t(200l))), t(b(t(210l))), t(b(t(220l))))));
-    data.add(t(b(t(b(t(300l),t(301l))))));
-    data.add(t(b(t(b(t(400l))), t(b(t(410l),t(411l),t(412l))))));
-
-
-    verifyWriteRead("TupleInBagInTupleInBag1", pigSchema, tableSchema, data, true);
-    verifyWriteRead("TupleInBagInTupleInBag2", pigSchema, tableSchema, data, false);
-
-    // test that we don't drop the unnecessary tuple if the table has the corresponding Struct
-    String tableSchema2 = "a array< struct< c: array< struct< i: bigint > > > >";
-
-    verifyWriteRead("TupleInBagInTupleInBag3", pigSchema, tableSchema2, data, true);
-    verifyWriteRead("TupleInBagInTupleInBag4", pigSchema, tableSchema2, data, false);
-
-  }
-
-  @Test
-  public void testMapWithComplexData() throws Exception {
-    String pigSchema = "(a: long, b: map[])";
-    String tableSchema = "a bigint, b map<string, struct<aa:bigint, ab:string>>";
-
-    List<Tuple> data = new ArrayList<Tuple>();
-    for (int i = 0; i < 10; i++) {
-      Tuple t = t(
-          (long)i,
-          new HashMap<String, Object>() {{put("b test 1", t(1l,"test 1"));put("b test 2", t(2l, "test 2"));}});
+    //private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static Driver driver;
+    //private static Properties props;
+    private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderComplexSchema.class);
+
+    private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
+        driver.run("drop table " + tablename);
+    }
+
+    private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
+        String createTable;
+        createTable = "create table " + tablename + "(" + schema + ") ";
+        if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
+            createTable = createTable + "partitioned by (" + partitionedBy + ") ";
+        }
+        createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+            "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+        LOG.info("Creating table:\n {}", createTable);
+        CommandProcessorResponse result = driver.run(createTable);
+        int retCode = result.getResponseCode();
+        if (retCode != 0) {
+            throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + " " + result.getErrorMessage() + "]");
+        }
+    }
+
+    private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
+        createTable(tablename, schema, null);
+    }
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+
+        HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class);
+        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+        driver = new Driver(hiveConf);
+        SessionState.start(new CliSessionState(hiveConf));
+        //props = new Properties();
+        //props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
 
-      data.add(t);
     }
-  verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true);
-  verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false);
 
-  }
- }
+    private static final TupleFactory tf = TupleFactory.getInstance();
+    private static final BagFactory bf = BagFactory.getInstance();
+
+    private Tuple t(Object... objects) {
+        return tf.newTuple(Arrays.asList(objects));
+    }
+
+    private DataBag b(Tuple... objects) {
+        return bf.newDefaultBag(Arrays.asList(objects));
+    }
+
+    /**
+     * artificially complex nested schema to test nested schema conversion
+     * @throws Exception
+     */
+    @Test
+    public void testSyntheticComplexSchema() throws Exception {
+        String pigSchema =
+            "(" +
+                "a: " +
+                "(" +
+                "aa: chararray, " +
+                "ab: long, " +
+                "ac: map[], " +
+                "ad: { t: (ada: long) }, " +
+                "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," +
+                "af: (afa: chararray, afb: long) " +
+                ")," +
+                "b: chararray, " +
+                "c: long, " +
+                "d:  { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " +
+                ")";
+
+        // with extra structs
+        String tableSchema =
+            "a struct<" +
+                "aa: string, " +
+                "ab: bigint, " +
+                "ac: map<string, string>, " +
+                "ad: array<struct<ada:bigint>>, " +
+                "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
+                "af: struct<afa: string, afb: bigint> " +
+                ">, " +
+                "b string, " +
+                "c bigint, " +
+                "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<struct<dca: bigint>>>>";
+
+        // without extra structs
+        String tableSchema2 =
+            "a struct<" +
+                "aa: string, " +
+                "ab: bigint, " +
+                "ac: map<string, string>, " +
+                "ad: array<bigint>, " +
+                "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
+                "af: struct<afa: string, afb: bigint> " +
+                ">, " +
+                "b string, " +
+                "c bigint, " +
+                "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<bigint>>>";
+
+        List<Tuple> data = new ArrayList<Tuple>();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = t(
+                t(
+                    "aa test",
+                    2l,
+                    new HashMap<String, String>() {
+                        {
+                            put("ac test1", "test 1");
+                            put("ac test2", "test 2");
+                        }
+                    },
+                    b(t(3l), t(4l)),
+                    b(t(5l, t("aeba test", 6l))),
+                    t("afa test", 7l)
+                ),
+                "b test",
+                (long) i,
+                b(t(8l, t("dba test", 9l), b(t(10l)))));
+
+            data.add(t);
+        }
+        verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, true);
+        verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, false);
+        verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, true);
+        verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, false);
+
+    }
+
+    private void verifyWriteRead(String tablename, String pigSchema, String tableSchema, List<Tuple> data, boolean provideSchemaToStorer)
+        throws IOException, CommandNeedRetryException, ExecException, FrontendException {
+        MockLoader.setData(tablename + "Input", data);
+        try {
+            createTable(tablename, tableSchema);
+            PigServer server = new PigServer(ExecType.LOCAL);
+            server.setBatchOn();
+            server.registerQuery("A = load '" + tablename + "Input' using org.apache.hcatalog.pig.MockLoader() AS " + pigSchema + ";");
+            Schema dumpedASchema = server.dumpSchema("A");
+            server.registerQuery("STORE A into '" + tablename + "' using org.apache.hcatalog.pig.HCatStorer("
+                + (provideSchemaToStorer ? "'', '" + pigSchema + "'" : "")
+                + ");");
+
+            ExecJob execJob = server.executeBatch().get(0);
+            if (!execJob.getStatistics().isSuccessful()) {
+                throw new RuntimeException("Import failed", execJob.getException());
+            }
+            // test that schema was loaded correctly
+            server.registerQuery("X = load '" + tablename + "' using org.apache.hcatalog.pig.HCatLoader();");
+            server.dumpSchema("X");
+            Iterator<Tuple> it = server.openIterator("X");
+            int i = 0;
+            while (it.hasNext()) {
+                Tuple input = data.get(i++);
+                Tuple output = it.next();
+                Assert.assertEquals(input.toString(), output.toString());
+                LOG.info("tuple : {} ", output);
+            }
+            Schema dumpedXSchema = server.dumpSchema("X");
+
+            Assert.assertEquals(
+                "expected " + dumpedASchema + " but was " + dumpedXSchema + " (ignoring field names)",
+                "",
+                compareIgnoreFiledNames(dumpedASchema, dumpedXSchema));
+
+        } finally {
+            dropTable(tablename);
+        }
+    }
+
+    private String compareIgnoreFiledNames(Schema expected, Schema got) throws FrontendException {
+        if (expected == null || got == null) {
+            if (expected == got) {
+                return "";
+            } else {
+                return "\nexpected " + expected + " got " + got;
+            }
+        }
+        if (expected.size() != got.size()) {
+            return "\nsize expected " + expected.size() + " (" + expected + ") got " + got.size() + " (" + got + ")";
+        }
+        String message = "";
+        for (int i = 0; i < expected.size(); i++) {
+            FieldSchema expectedField = expected.getField(i);
+            FieldSchema gotField = got.getField(i);
+            if (expectedField.type != gotField.type) {
+                message += "\ntype expected " + expectedField.type + " (" + expectedField + ") got " + gotField.type + " (" + gotField + ")";
+            } else {
+                message += compareIgnoreFiledNames(expectedField.schema, gotField.schema);
+            }
+        }
+        return message;
+    }
+
+    /**
+     * tests that unnecessary tuples are drop while converting schema
+     * (Pig requires Tuples in Bags)
+     * @throws Exception
+     */
+    @Test
+    public void testTupleInBagInTupleInBag() throws Exception {
+        String pigSchema = "(a: { b : ( c: { d: (i : long) } ) })";
+
+        String tableSchema = "a array< array< bigint > >";
+
+        List<Tuple> data = new ArrayList<Tuple>();
+        data.add(t(b(t(b(t(100l), t(101l))), t(b(t(110l))))));
+        data.add(t(b(t(b(t(200l))), t(b(t(210l))), t(b(t(220l))))));
+        data.add(t(b(t(b(t(300l), t(301l))))));
+        data.add(t(b(t(b(t(400l))), t(b(t(410l), t(411l), t(412l))))));
+
+
+        verifyWriteRead("TupleInBagInTupleInBag1", pigSchema, tableSchema, data, true);
+        verifyWriteRead("TupleInBagInTupleInBag2", pigSchema, tableSchema, data, false);
+
+        // test that we don't drop the unnecessary tuple if the table has the corresponding Struct
+        String tableSchema2 = "a array< struct< c: array< struct< i: bigint > > > >";
+
+        verifyWriteRead("TupleInBagInTupleInBag3", pigSchema, tableSchema2, data, true);
+        verifyWriteRead("TupleInBagInTupleInBag4", pigSchema, tableSchema2, data, false);
+
+    }
+
+    @Test
+    public void testMapWithComplexData() throws Exception {
+        String pigSchema = "(a: long, b: map[])";
+        String tableSchema = "a bigint, b map<string, struct<aa:bigint, ab:string>>";
+
+        List<Tuple> data = new ArrayList<Tuple>();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = t(
+                (long) i,
+                new HashMap<String, Object>() {
+                    {
+                        put("b test 1", t(1l, "test 1"));
+                        put("b test 2", t(2l, "test 2"));
+                    }
+                });
+
+            data.add(t);
+        }
+        verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true);
+        verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false);
+
+    }
+}

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java Mon Sep 10 23:28:55 2012
@@ -40,101 +40,101 @@ import java.util.List;
  */
 public class TestHCatLoaderStorer extends HCatBaseTest {
 
-  /**
-   * Ensure Pig can read/write tinyint/smallint columns.
-   */
-  @Test
-  public void testSmallTinyInt() throws Exception {
-
-    String readTblName = "test_small_tiny_int";
-    File dataDir = new File(TEST_DATA_DIR + "/testSmallTinyIntData");
-    File dataFile = new File(dataDir, "testSmallTinyInt.tsv");
-
-    String writeTblName = "test_small_tiny_int_write";
-    File writeDataFile = new File(TEST_DATA_DIR, writeTblName + ".tsv");
-
-    FileUtil.fullyDelete(dataDir); // Might not exist
-    Assert.assertTrue(dataDir.mkdir());
-
-    HcatTestUtils.createTestDataFile(dataFile.getAbsolutePath(), new String[]{
-        String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE),
-        String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE)
-    });
-
-    // Create a table with smallint/tinyint columns, load data, and query from Hive.
-    Assert.assertEquals(0, driver.run("drop table if exists " + readTblName).getResponseCode());
-    Assert.assertEquals(0, driver.run("create external table " + readTblName +
-        " (my_small_int smallint, my_tiny_int tinyint)" +
-        " row format delimited fields terminated by '\t' stored as textfile").getResponseCode());
-    Assert.assertEquals(0, driver.run("load data local inpath '" +
-        dataDir.getAbsolutePath() + "' into table " + readTblName).getResponseCode());
-
-    PigServer server = new PigServer(ExecType.LOCAL);
-    server.registerQuery(
-        "data = load '" + readTblName + "' using org.apache.hcatalog.pig.HCatLoader();");
-
-    // Ensure Pig schema is correct.
-    Schema schema = server.dumpSchema("data");
-    Assert.assertEquals(2, schema.getFields().size());
-    Assert.assertEquals("my_small_int", schema.getField(0).alias);
-    Assert.assertEquals(DataType.INTEGER, schema.getField(0).type);
-    Assert.assertEquals("my_tiny_int", schema.getField(1).alias);
-    Assert.assertEquals(DataType.INTEGER, schema.getField(1).type);
-
-    // Ensure Pig can read data correctly.
-    Iterator<Tuple> it = server.openIterator("data");
-    Tuple t = it.next();
-    Assert.assertEquals(new Integer(Short.MIN_VALUE), t.get(0));
-    Assert.assertEquals(new Integer(Byte.MIN_VALUE), t.get(1));
-    t = it.next();
-    Assert.assertEquals(new Integer(Short.MAX_VALUE), t.get(0));
-    Assert.assertEquals(new Integer(Byte.MAX_VALUE), t.get(1));
-    Assert.assertFalse(it.hasNext());
-
-    // Ensure Pig can write correctly to smallint/tinyint columns. This means values within the
-    // bounds of the column type are written, and values outside throw an exception.
-    Assert.assertEquals(0, driver.run("drop table if exists " + writeTblName).getResponseCode());
-    Assert.assertEquals(0, driver.run("create table " + writeTblName +
-        " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode());
-
-    // Values within the column type bounds.
-    HcatTestUtils.createTestDataFile(writeDataFile.getAbsolutePath(), new String[]{
-        String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE),
-        String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE)
-    });
-    smallTinyIntBoundsCheckHelper(writeDataFile.getAbsolutePath(), ExecJob.JOB_STATUS.COMPLETED);
-
-    // Values outside the column type bounds will fail at runtime.
-    HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooSmall.tsv", new String[]{
-        String.format("%d\t%d", Short.MIN_VALUE - 1, 0)});
-    smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooSmall.tsv", ExecJob.JOB_STATUS.FAILED);
-
-    HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooBig.tsv", new String[]{
-        String.format("%d\t%d", Short.MAX_VALUE + 1, 0)});
-    smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooBig.tsv", ExecJob.JOB_STATUS.FAILED);
-
-    HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooSmall.tsv", new String[]{
-        String.format("%d\t%d", 0, Byte.MIN_VALUE - 1)});
-    smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooSmall.tsv", ExecJob.JOB_STATUS.FAILED);
-
-    HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooBig.tsv", new String[]{
-        String.format("%d\t%d", 0, Byte.MAX_VALUE + 1)});
-    smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooBig.tsv", ExecJob.JOB_STATUS.FAILED);
-  }
-
-  private void smallTinyIntBoundsCheckHelper(String data, ExecJob.JOB_STATUS expectedStatus)
-      throws Exception {
-    Assert.assertEquals(0, driver.run("drop table if exists test_tbl").getResponseCode());
-    Assert.assertEquals(0, driver.run("create table test_tbl" +
-        " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode());
-
-    PigServer server = new PigServer(ExecType.LOCAL);
-    server.setBatchOn();
-    server.registerQuery("data = load '" + data +
-        "' using PigStorage('\t') as (my_small_int:int, my_tiny_int:int);");
-    server.registerQuery(
-        "store data into 'test_tbl' using org.apache.hcatalog.pig.HCatStorer();");
-    List<ExecJob> jobs = server.executeBatch();
-    Assert.assertEquals(expectedStatus, jobs.get(0).getStatus());
-  }
+    /**
+     * Ensure Pig can read/write tinyint/smallint columns.
+     */
+    @Test
+    public void testSmallTinyInt() throws Exception {
+
+        String readTblName = "test_small_tiny_int";
+        File dataDir = new File(TEST_DATA_DIR + "/testSmallTinyIntData");
+        File dataFile = new File(dataDir, "testSmallTinyInt.tsv");
+
+        String writeTblName = "test_small_tiny_int_write";
+        File writeDataFile = new File(TEST_DATA_DIR, writeTblName + ".tsv");
+
+        FileUtil.fullyDelete(dataDir); // Might not exist
+        Assert.assertTrue(dataDir.mkdir());
+
+        HcatTestUtils.createTestDataFile(dataFile.getAbsolutePath(), new String[]{
+            String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE),
+            String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE)
+        });
+
+        // Create a table with smallint/tinyint columns, load data, and query from Hive.
+        Assert.assertEquals(0, driver.run("drop table if exists " + readTblName).getResponseCode());
+        Assert.assertEquals(0, driver.run("create external table " + readTblName +
+            " (my_small_int smallint, my_tiny_int tinyint)" +
+            " row format delimited fields terminated by '\t' stored as textfile").getResponseCode());
+        Assert.assertEquals(0, driver.run("load data local inpath '" +
+            dataDir.getAbsolutePath() + "' into table " + readTblName).getResponseCode());
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.registerQuery(
+            "data = load '" + readTblName + "' using org.apache.hcatalog.pig.HCatLoader();");
+
+        // Ensure Pig schema is correct.
+        Schema schema = server.dumpSchema("data");
+        Assert.assertEquals(2, schema.getFields().size());
+        Assert.assertEquals("my_small_int", schema.getField(0).alias);
+        Assert.assertEquals(DataType.INTEGER, schema.getField(0).type);
+        Assert.assertEquals("my_tiny_int", schema.getField(1).alias);
+        Assert.assertEquals(DataType.INTEGER, schema.getField(1).type);
+
+        // Ensure Pig can read data correctly.
+        Iterator<Tuple> it = server.openIterator("data");
+        Tuple t = it.next();
+        Assert.assertEquals(new Integer(Short.MIN_VALUE), t.get(0));
+        Assert.assertEquals(new Integer(Byte.MIN_VALUE), t.get(1));
+        t = it.next();
+        Assert.assertEquals(new Integer(Short.MAX_VALUE), t.get(0));
+        Assert.assertEquals(new Integer(Byte.MAX_VALUE), t.get(1));
+        Assert.assertFalse(it.hasNext());
+
+        // Ensure Pig can write correctly to smallint/tinyint columns. This means values within the
+        // bounds of the column type are written, and values outside throw an exception.
+        Assert.assertEquals(0, driver.run("drop table if exists " + writeTblName).getResponseCode());
+        Assert.assertEquals(0, driver.run("create table " + writeTblName +
+            " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode());
+
+        // Values within the column type bounds.
+        HcatTestUtils.createTestDataFile(writeDataFile.getAbsolutePath(), new String[]{
+            String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE),
+            String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE)
+        });
+        smallTinyIntBoundsCheckHelper(writeDataFile.getAbsolutePath(), ExecJob.JOB_STATUS.COMPLETED);
+
+        // Values outside the column type bounds will fail at runtime.
+        HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooSmall.tsv", new String[]{
+            String.format("%d\t%d", Short.MIN_VALUE - 1, 0)});
+        smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooSmall.tsv", ExecJob.JOB_STATUS.FAILED);
+
+        HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooBig.tsv", new String[]{
+            String.format("%d\t%d", Short.MAX_VALUE + 1, 0)});
+        smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooBig.tsv", ExecJob.JOB_STATUS.FAILED);
+
+        HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooSmall.tsv", new String[]{
+            String.format("%d\t%d", 0, Byte.MIN_VALUE - 1)});
+        smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooSmall.tsv", ExecJob.JOB_STATUS.FAILED);
+
+        HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooBig.tsv", new String[]{
+            String.format("%d\t%d", 0, Byte.MAX_VALUE + 1)});
+        smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooBig.tsv", ExecJob.JOB_STATUS.FAILED);
+    }
+
+    private void smallTinyIntBoundsCheckHelper(String data, ExecJob.JOB_STATUS expectedStatus)
+        throws Exception {
+        Assert.assertEquals(0, driver.run("drop table if exists test_tbl").getResponseCode());
+        Assert.assertEquals(0, driver.run("create table test_tbl" +
+            " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode());
+
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("data = load '" + data +
+            "' using PigStorage('\t') as (my_small_int:int, my_tiny_int:int);");
+        server.registerQuery(
+            "store data into 'test_tbl' using org.apache.hcatalog.pig.HCatStorer();");
+        List<ExecJob> jobs = server.executeBatch();
+        Assert.assertEquals(expectedStatus, jobs.get(0).getStatus());
+    }
 }



Mime
View raw message