pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pradeep...@apache.org
Subject svn commit: r897283 [2/5] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/ contrib/zebra/ contrib/zebra/src/java/org/apache/hadoop/zebra/pig/ contrib/zebra/src/java/org/apache/hadoop/zebra/types/ contrib/zebra/src/test/e2e/merg...
Date Fri, 08 Jan 2010 18:17:12 GMT
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNegative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNegative.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNegative.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNegative.java Fri Jan  8 18:17:07 2010
@@ -94,6 +94,33 @@
     }
   }
 
+//Negative test case. For record split, we should not try to store same
+  // record field on different column groups.
+  @Test
+  public void testWriteRecord6() throws IOException, ParseException {
+    String STR_SCHEMA = "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4))";
+    String STR_STORAGE = "[r1.f1]; [r1.f2, r2.r3.f3]; [r2.r3]";
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), this.getClass().getSimpleName());
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+    // Build Table and column groups
+    BasicTable.Writer writer = null;
+    try {
+      writer = new BasicTable.Writer(path, STR_SCHEMA, STR_STORAGE, conf);
+      Assert.fail("Should throw exception");
+    } catch (Exception e) {
+      System.out.println(e);
+    }
+  }
+  
   // Negative test case. map storage syntax is wrong
   @Test
   public void testWriteMap1() throws IOException, ParseException {

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,1048 @@
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.Assert;
+
+/**
+ * TestComparator
+ * 
+ * Utility for verifying tables created during Zebra Stress Testing
+ * 
+ */
+public class ToolTestComparator {
+
+  final static String TABLE_SCHEMA = "count:int,seed:int,int1:int,int2:int,str1:string,str2:string,byte1:bytes,"
+      + "byte2:bytes,float1:float,long1:long,double1:double,m1:map(string),r1:record(f1:string, f2:string),"
+      + "c1:collection(a:string, b:string)";
+  final static String TABLE_STORAGE = "[count,seed,int1,int2,str1,str2,byte1,byte2,float1,long1,double1];[m1#{a}];[r1,c1]";
+
+  private static Random generator = new Random();
+
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  protected static ExecJob pigJob;
+  private static Path path;
+
+  private static String zebraJar;
+  private static String whichCluster;
+
+  private static int totalNumbCols;
+  private static long totalNumbVerifiedRows;
+
+  /**
+   * Setup and initialize environment
+   */
+  public static void setUp() throws Exception {
+    System.out.println("setUp()");
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    
+    if (System.getProperty("whichCluster") == null) {
+      System.setProperty("whichCluster", "miniCluster");
+      whichCluster = System.getProperty("whichCluster");
+    } else {
+      whichCluster = System.getProperty("whichCluster");
+    }
+
+    System.out.println("cluster: " + whichCluster);
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("HADOOP_HOME") == null) {
+      System.out.println("Please set HADOOP_HOME");
+      System.exit(0);
+    }
+
+    conf = new Configuration();
+
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("USER") == null) {
+      System.out.println("Please set USER");
+      System.exit(0);
+    }
+    zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+    File file = new File(zebraJar);
+    if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
+      System.out.println("Please put zebra.jar at hadoop_home/../jars");
+      System.exit(0);
+    }
+
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      System.out.println("Running realCluster");
+      pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+          .toProperties(conf));
+      pigServer.registerJar(zebraJar);
+      path = new Path("/user/" + System.getenv("USER") + "/TestComparator");
+      // removeDir(path);
+      fs = path.getFileSystem(conf);
+    }
+
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      System.out.println("Running miniCluster");
+      if (execType == ExecType.MAPREDUCE) {
+        cluster = MiniCluster.buildCluster();
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        fs = cluster.getFileSystem();
+        path = new Path(fs.getWorkingDirectory() + "/TestComparator");
+        // removeDir(path);
+        System.out.println("path1 =" + path);
+      } else {
+        pigServer = new PigServer(ExecType.LOCAL);
+      }
+    }
+  }
+
+  /**
+   * Verify load/store
+   * 
+   */
+  public static void verifyLoad(String pathTable1, String pathTable2,
+      int numbCols) throws IOException {
+    System.out.println("verifyLoad()");
+
+    // Load table1
+    String query1 = "table1 = LOAD '" + pathTable1
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    System.out.println("verifyLoad() running query : " + query1);
+    pigServer.registerQuery(query1);
+
+    // Load table2
+    String query2 = "table2 = LOAD '" + pathTable2
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    System.out.println("verifyLoad() running query : " + query2);
+    pigServer.registerQuery(query2);
+
+    // Get metrics from first table
+    Iterator<Tuple> it1 = pigServer.openIterator("table1");
+
+    int numbCols1 = 0;
+    long numbRows1 = 0;
+
+    while (it1.hasNext()) {
+      ++numbRows1; // increment row count
+      Tuple rowValue = it1.next();
+      numbCols1 = rowValue.size();
+      if (numbCols != 0)
+        Assert.assertEquals(
+            "Verify failed - Table1 has wrong number of expected columns "
+                + "\n row number : " + numbRows1 + "\n expected column size : "
+                + numbCols + "\n actual columns size  : " + numbCols1,
+            numbCols, numbCols1);
+    }
+
+    // Get metrics from second table
+    Iterator<Tuple> it2 = pigServer.openIterator("table2");
+
+    int numbCols2 = 0;
+    long numbRows2 = 0;
+
+    while (it2.hasNext()) {
+      ++numbRows2; // increment row count
+      Tuple rowValue = it2.next();
+      numbCols2 = rowValue.size();
+      if (numbCols != 0)
+        Assert.assertEquals(
+            "Verify failed - Table2 has wrong number of expected columns "
+                + "\n row number : " + numbRows2 + "\n expected column size : "
+                + numbCols + "\n actual columns size  : " + numbCols2,
+            numbCols, numbCols2);
+    }
+
+    Assert
+        .assertEquals(
+            "Verify failed - Tables have different number row sizes "
+                + "\n table1 rows : " + numbRows1 + "\n table2 rows : "
+                + numbRows2, numbRows1, numbRows2);
+
+    Assert.assertEquals(
+        "Verify failed - Tables have different number column sizes "
+            + "\n table1 column size : " + numbCols1
+            + "\n table2 column size : " + numbCols2, numbCols1, numbCols2);
+
+    System.out.println();
+    System.out.println("Verify load - table1 columns : " + numbCols1);
+    System.out.println("Verify load - table2 columns : " + numbCols2);
+    System.out.println("Verify load - table1 rows : " + numbRows1);
+    System.out.println("Verify load - table2 rows : " + numbRows2);
+    System.out.println("Verify load - PASS");
+  }
+
+  /**
+   * Verify table
+   * 
+   */
+  public static void verifyTable(String pathTable1) throws IOException {
+    System.out.println("verifyTable()");
+
+    // Load table1
+    String query1 = "table1 = LOAD '" + pathTable1
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    System.out.println("verifyTable() running query : " + query1);
+    pigServer.registerQuery(query1);
+
+    // Get metrics from table
+    Iterator<Tuple> it1 = pigServer.openIterator("table1");
+
+    int numbCols1 = 0;
+    long numbRows1 = 0;
+
+    System.out.println("DEBUG starting to iterate table1");
+
+    while (it1.hasNext()) {
+      ++numbRows1; // increment row count
+      Tuple rowValue = it1.next();
+      numbCols1 = rowValue.size();
+    }
+
+    System.out.println();
+    System.out.println("Verify table columns : " + numbCols1);
+    System.out.println("Verify table rows : " + numbRows1);
+    System.out.println("Verify table complete");
+  }
+
+  /**
+   * Verify sorted
+   * 
+   */
+  public static void verifySorted(String pathTable1, String pathTable2,
+      int sortCol, String sortKey, int numbCols, int rowMod)
+      throws IOException, ParseException {
+    System.out.println("verifySorted()");
+
+    // Load table1
+    String query1 = "table1 = LOAD '" + pathTable1
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    System.out.println("verifySorted() running query : " + query1);
+    pigServer.registerQuery(query1);
+
+    //
+    // Get metrics from first table (unsorted)
+    //
+    Iterator<Tuple> it1 = pigServer.openIterator("table1");
+
+    int numbCols1 = 0;
+    long numbRows1 = 0;
+
+    System.out.println("DEBUG starting to iterate table1");
+
+    while (it1.hasNext()) {
+      ++numbRows1; // increment row count
+      Tuple rowValue = it1.next();
+      numbCols1 = rowValue.size();
+      if (numbCols != 0)
+        Assert.assertEquals(
+            "Verify failed - Table1 has wrong number of expected columns "
+                + "\n row number : " + numbRows1 + "\n expected column size : "
+                + numbCols + "\n actual columns size  : " + numbCols1,
+            numbCols, numbCols1);
+    }
+
+    System.out.println();
+    System.out.println("Verify unsorted table1 columns : " + numbCols1);
+    System.out.println("Verify unsorted table1 rows : " + numbRows1);
+
+    System.out.println("\nDEBUG starting to iterate table2");
+
+    //
+    // Get metrics from second table (sorted)
+    //
+    long numbRows2 = verifySortedTable(pathTable2, sortCol, sortKey, numbCols,
+        rowMod, null);
+
+    int numbCols2 = totalNumbCols;
+    long numbVerifiedRows = totalNumbVerifiedRows;
+
+    Assert
+        .assertEquals(
+            "Verify failed - Tables have different number row sizes "
+                + "\n table1 rows : " + numbRows1 + "\n table2 rows : "
+                + numbRows2, numbRows1, numbRows2);
+
+    Assert.assertEquals(
+        "Verify failed - Tables have different number column sizes "
+            + "\n table1 column size : " + numbCols1
+            + "\n table2 column size : " + numbCols2, numbCols1, numbCols2);
+
+    System.out.println();
+    System.out.println("Verify unsorted table1 columns : " + numbCols1);
+    System.out.println("Verify sorted   table2 columns : " + numbCols2);
+    System.out.println("Verify unsorted table1 rows : " + numbRows1);
+    System.out.println("Verify sorted   table2 rows : " + numbRows2);
+    System.out.println("Verify sorted - numb verified rows : "
+        + numbVerifiedRows);
+    System.out.println("Verify sorted - sortCol : " + sortCol);
+    System.out.println("Verify sorted - PASS");
+  }
+
+  /**
+   * Verify merge-join
+   * 
+   */
+  public static void verifyMergeJoin(String pathTable1, int sortCol,
+      String sortKey, int numbCols, int rowMod, String verifyDataColName) throws IOException,
+      ParseException {
+    System.out.println("verifyMergeJoin()");
+
+    //
+    // Verify sorted table
+    //
+    long numbRows = verifySortedTable(pathTable1, sortCol, sortKey, numbCols,
+        rowMod, verifyDataColName);
+
+    System.out.println();
+    System.out.println("Verify merge-join   table columns : " + totalNumbCols);
+    System.out.println("Verify merge-join   table rows : " + numbRows);
+    System.out.println("Verify merge-join - numb verified rows : "
+        + totalNumbVerifiedRows);
+    System.out.println("Verify merge-join - sortCol : " + sortCol);
+    System.out.println("Verify merge-join - PASS");
+  }
+
+  /**
+   * Verify sorted-union
+   * 
+   */
+  public static void verifySortedUnion(ArrayList<String> unionPaths,
+      String pathTable1, int sortCol, String sortKey, int numbCols, int rowMod,
+      String verifyDataColName) throws IOException, ParseException {
+    System.out.println("verifySortedUnion()");
+
+    long numbUnionRows = 0;
+    ArrayList<Long> numbRows = new ArrayList<Long>();
+
+    // Get number of rows from each of the input union tables
+   for (int i = 0; i < unionPaths.size(); ++i) {
+      // Load table1
+      String query1 = "table1 = LOAD '" + unionPaths.get(i)
+          + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+      System.out.println("verifySortedUnion() running query : " + query1);
+      pigServer.registerQuery(query1);
+      String orderby1 = "sort1 = ORDER table1 BY " + sortKey + " ;";
+      System.out.println("orderby1 : " + orderby1);
+      pigServer.registerQuery(orderby1);
+
+      // Get metrics for each input sorted table
+      Iterator<Tuple> it1 = pigServer.openIterator("sort1");
+      long numbRows1 = 0;
+
+      while (it1.hasNext()) {
+        ++numbRows1; // increment row count
+        Tuple rowValue = it1.next();
+      }
+      numbRows.add(numbRows1);
+      numbUnionRows += numbRows1;
+    }
+
+    //
+    // Verify sorted union table
+    //
+    long numbRows1 = verifySortedTable(pathTable1, sortCol, sortKey, numbCols,
+        rowMod, verifyDataColName);
+
+   
+    //
+    // Print all union input tables and rows for each
+    //
+    System.out.println();
+    for (int i = 0; i < unionPaths.size(); ++i) {
+      System.out.println("Input union table" + i + " path  : "
+          + unionPaths.get(i));
+      System.out.println("Input union table" + i + " rows  : "
+          + numbRows.get(i));
+    }
+    System.out.println();
+    System.out.println("Input union total rows   : " + numbUnionRows);
+
+    System.out.println();
+    System.out.println("Verify union - table columns : " + totalNumbCols);
+    System.out.println("Verify union - table rows : " + numbRows1);
+    System.out.println("Verify union - numb verified rows : "
+        + totalNumbVerifiedRows);
+    System.out.println("Verify union - sortCol : " + sortCol);
+
+  /*  Assert.assertEquals(
+        "Verify failed - sorted union table row comparison error "
+            + "\n expected table rows : " + numbUnionRows
+            + "\n actual table rows : " + numbRows1, numbUnionRows, numbRows1);
+*/
+    System.out.println("Verify union - PASS");
+  }
+
+  /**
+   * Create unsorted table
+   * 
+   */
+  public static void createtable(String pathTable1, long numbRows, int seed,
+      boolean debug) throws ExecException, IOException, ParseException {
+    System.out.println("createtable()");
+
+    Path unsortedPath = new Path(pathTable1);
+
+    // Remove old table (if present)
+    removeDir(unsortedPath);
+
+    // Create table
+    BasicTable.Writer writer = new BasicTable.Writer(unsortedPath,
+        TABLE_SCHEMA, TABLE_STORAGE, conf);
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    TableInserter inserter = writer.getInserter("ins", false);
+
+    Map<String, String> m1 = new HashMap<String, String>();
+
+    Tuple tupRecord1; // record
+    tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+        .getSchema()); // r1 schema
+
+    DataBag bag1 = TypesUtils.createBag();
+    Schema schColl = schema.getColumnSchema("c1").getSchema(); // c1 schema
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+
+    int randRange = new Long(numbRows / 10).intValue(); // random range to allow
+    // for duplicate values
+    for (int i = 0; i < numbRows; ++i) {
+      int random = generator.nextInt(randRange);
+
+      TypesUtils.resetTuple(tuple); // reset row tuple
+      m1.clear(); // reset map
+      TypesUtils.resetTuple(tupRecord1); // reset record
+      TypesUtils.resetTuple(tupColl1); // reset collection
+      TypesUtils.resetTuple(tupColl2);
+      bag1.clear();
+
+      tuple.set(0, i); // count
+      tuple.set(1, seed); // seed
+
+      tuple.set(2, i); // int1
+      tuple.set(3, random); // int2
+      tuple.set(4, "string " + i); // str1
+      tuple.set(5, "string random " + random); // str2
+      tuple.set(6, new DataByteArray("byte " + i)); // byte1
+      tuple.set(7, new DataByteArray("byte random " + random)); // byte2
+
+      tuple.set(8, new Float(i * -1)); // float1 negative
+      tuple.set(9, new Long(numbRows - i)); // long1 reverse
+      tuple.set(10, new Double(i * 100)); // double1
+
+      // insert map1
+      m1.put("a", "m1");
+      m1.put("b", "m1 " + i);
+      tuple.set(11, m1);
+
+      // insert record1
+      tupRecord1.set(0, "r1 " + seed);
+      tupRecord1.set(1, "r1 " + i);
+      tuple.set(12, tupRecord1);
+
+      // insert collection1
+      tupColl1.set(0, "c1 a " + seed);
+      tupColl1.set(1, "c1 a " + i);
+      bag1.add(tupColl1); // first collection item
+
+      tupColl2.set(0, "c1 b " + seed);
+      tupColl2.set(1, "c1 b " + i);
+      bag1.add(tupColl2); // second collection item
+
+      tuple.set(13, bag1);
+
+      inserter.insert(new BytesWritable(("key" + i).getBytes()), tuple);
+    }
+    inserter.close();
+    writer.close();
+
+    if (debug == true) {
+      // Load tables
+      String query1 = "table1 = LOAD '" + unsortedPath.toString()
+          + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+      pigServer.registerQuery(query1);
+
+      // Print Table
+      printTable("table1");
+    }
+
+    System.out.println("Table Path : " + unsortedPath);
+  }
+
+  /**
+   * Create sorted table
+   * 
+   */
+  public static void createsortedtable(String pathTable1, String pathTable2,
+      String sortString, boolean debug) throws ExecException, IOException {
+    System.out.println("createsortedtable()");
+
+    Path unsortedPath = new Path(pathTable1);
+    Path sortedPath = new Path(pathTable2);
+
+    // Remove old table (if present)
+    removeDir(sortedPath);
+
+    // Load tables
+    String query1 = "table1 = LOAD '" + unsortedPath.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query1);
+
+    // Sort table
+    String orderby1 = "sort1 = ORDER table1 BY " + sortString + " ;";
+    System.out.println("orderby1 : " + orderby1);
+    pigServer.registerQuery(orderby1);
+
+    // Store sorted tables
+    pigJob = pigServer.store("sort1", sortedPath.toString(), TableStorer.class
+        .getCanonicalName()
+        + "('" + TABLE_STORAGE + "')");
+    Assert.assertNull(pigJob.getException());
+
+    // Print Table
+    if (debug == true)
+      printTable("sort1");
+
+    System.out.println("Sorted Path : " + sortedPath);
+  }
+
+  /**
+   * Delete table
+   * 
+   */
+  public static void deleteTable(String pathTable1) throws ExecException,
+      IOException {
+    System.out.println("deleteTable()");
+
+    Path tablePath = new Path(pathTable1);
+
+    // Remove table (if present)
+    removeDir(tablePath);
+
+    System.out.println("Deleted Table Path : " + tablePath);
+  }
+
+  /**
+   * Verify sorted table
+   * 
+   * Using BasicTable.Reader, read all table rows and verify that sortCol is in
+   * sorted order
+   * 
+   */
+  private static long verifySortedTable(String pathTable1, int sortCol,
+      String sortKey, int numbCols, int rowMod, String verifyDataColName)
+      throws IOException, ParseException {
+
+    long numbRows = 0;
+
+    Path tablePath = new Path(pathTable1);
+
+    BasicTable.Reader reader = new BasicTable.Reader(tablePath, conf);
+   
+    JobConf conf1 = new JobConf(conf);
+    System.out.println("sortKey: " + sortKey);
+    TableInputFormat.setInputPaths(conf1, new Path(pathTable1));
+ 
+    TableInputFormat.requireSortedTable(conf1, null);
+    TableInputFormat tif = new TableInputFormat();
+ 
+    SortedTableSplit split = (SortedTableSplit) tif.getSplits(conf1, 1)[0];
+    
+    TableScanner scanner = reader.getScanner(split.getBegin(), split.getEnd(), true);
+    BytesWritable key = new BytesWritable();
+    Tuple rowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    Object lastVal = null;
+    int numbCols1 = 0;
+    long numbVerifiedRows = 0;
+
+    while (!scanner.atEnd()) {
+      ++numbRows;
+      scanner.getKey(key);
+
+      scanner.getValue(rowValue);
+
+      // Verify every nth row
+      if ((numbRows % rowMod) == 0) {
+        ++numbVerifiedRows;
+        numbCols1 = rowValue.size();
+        if (numbCols != 0)
+          Assert.assertEquals(
+              "Verify failed - Table1 has wrong number of expected columns "
+                  + "\n row numberrr : " + numbRows
+                  + "\n expected column size : " + numbCols
+                  + "\n actual columns size  : " + numbCols1, numbCols,
+              numbCols1);
+
+        Object newVal = rowValue.get(sortCol);
+
+        // Verify sort key is in sorted order
+        Assert.assertTrue("Verify failed - Table1 sort comparison error "
+            + "\n row number : " + numbRows + "\n sort column : " + sortCol
+            + "\n sort column last value    : " + lastVal
+            + "\n sort column current value : " + newVal, compareTo(newVal,
+            lastVal) >= 0);
+
+        lastVal = newVal; // save last compare value
+
+        //
+        // Optionally verify data
+        //
+       
+        if (verifyDataColName != null && verifyDataColName.equals("long1")) {
+          Object newValLong1 = rowValue.get(sortCol);
+          if (numbRows < 2000){
+            System.out.println("Row : "+ (numbRows-1) +" long1 value : "+newValLong1.toString());
+          }
+          Assert.assertEquals(
+              "Verify failed - Union table data verification error for column name : "
+                  + verifyDataColName + "\n row number : " + (numbRows-1)
+                  + "\n expected value : " + (numbRows-1 + 4) / 4 + // long1 will start with value 1
+                  "\n actual value   : " + newValLong1, (numbRows-1 + 4) / 4,
+              newValLong1);
+
+        }
+
+        scanner.advance();
+      }
+      
+
+    }
+    
+    System.out.println("\nTable Pathh : " + pathTable1);
+    System.out.println("++++++++++Table Row number : " + numbRows);
+    
+   
+    reader.close();
+   
+    totalNumbCols = numbCols1;
+    totalNumbVerifiedRows = numbVerifiedRows;
+
+    return numbRows;
+  }
+
+  /**
+   * Print table rows
+   * 
+   * Print the first number of specified table rows
+   * 
+   */
+  public static void printRows(String pathTable1, long numbRows)
+      throws IOException {
+    System.out.println("printRows()");
+
+    // Load table1
+    String query1 = "table1 = LOAD '" + pathTable1
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query1);
+    
+   //
+    // Get metrics from first table (unsorted)
+    //
+    long count = 0;
+    Iterator<Tuple> it1 = pigServer.openIterator("table1");
+
+    while (it1.hasNext()) {
+      ++count;
+      if (count > numbRows)
+        break;
+      Tuple RowValue1 = it1.next();
+      System.out.println();
+      for (int i = 0; i < RowValue1.size(); ++i)
+        System.out.println("DEBUG: " + "table" + " RowValue.get(" + i + ") = "
+            + RowValue1.get(i));
+    }
+    System.out.println("\nTable Path : " + pathTable1);
+    System.out.println("Table Rows Printed : " + numbRows);
+  }
+  
+  /* 
+  * Print the first number of specified table rows
+  * 
+  */
+ public static void printRowNumber(String pathTable1, String sortKey)
+      throws IOException, ParseException {
+    long numbRows = 0;
+
+    Path tablePath = new Path(pathTable1);
+
+    BasicTable.Reader reader = new BasicTable.Reader(tablePath, conf);
+   
+    JobConf conf1 = new JobConf(conf);
+    System.out.println("sortKey: " + sortKey);
+    TableInputFormat.setInputPaths(conf1, new Path(pathTable1));
+
+    TableInputFormat.requireSortedTable(conf1, null);
+    TableInputFormat tif = new TableInputFormat();
+
+  
+    TableScanner scanner = reader.getScanner(null, null, true);
+    BytesWritable key = new BytesWritable();
+    Tuple rowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    while (!scanner.atEnd()) {
+      ++numbRows;
+      scanner.getKey(key);
+      scanner.advance();
+    }
+    System.out.println("\nTable Path : " + pathTable1);
+    System.out.println("Table Row number : " + numbRows);
+  }
+  /**
+   * Compare table rows
+   * 
+   */
+  private static boolean compareRow(Tuple rowValues1, Tuple rowValues2)
+      throws IOException {
+    boolean result = true;
+    Assert.assertEquals(rowValues1.size(), rowValues2.size());
+    for (int i = 0; i < rowValues1.size(); ++i) {
+      if (!compareObj(rowValues1.get(i), rowValues2.get(i))) {
+        System.out.println("DEBUG: " + " RowValue.get(" + i
+            + ") value compare error : " + rowValues1.get(i) + " : "
+            + rowValues2.get(i));
+        result = false;
+        break;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Compare table values
+   * 
+   */
+  private static boolean compareObj(Object object1, Object object2) {
+    if (object1 == null) {
+      if (object2 == null)
+        return true;
+      else
+        return false;
+    } else if (object1.equals(object2))
+      return true;
+    else
+      return false;
+  }
+
+  /**
+   * Compares two objects that implement the Comparable interface
+   * 
+   * Zebra supported "sort" types of String, DataByteArray, Integer, Float,
+   * Long, Double, and Boolean all implement the Comparable interface.
+   * 
+   * Returns a negative integer, zero, or a positive integer if object1 is less
+   * than, equal to, or greater than object2.
+   * 
+   */
+  private static int compareTo(Object object1, Object object2) {
+    if (object1 == null) {
+      if (object2 == null)
+        return 0;
+      else
+        return -1;
+    } else if (object2 == null) {
+      return 1;
+    } else
+      return ((Comparable) object1).compareTo((Comparable) object2);
+  }
+
+  /**
+   * Print Table Metadata Info (for debugging)
+   * 
+   */
+  private static void printTableInfo(String pathString) throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    PrintStream ps = new PrintStream(bos);
+    System.out.println("start dumpinfo ===========");
+    BasicTable.dumpInfo(pathString, ps, conf);
+
+    System.out.println("bos.toString() : " + bos.toString());
+  }
+
+  /**
+   * Print Pig Table (for debugging)
+   * 
+   */
+  private static int printTable(String tablename) throws IOException {
+    Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+    int numbRows = 0;
+    while (it1.hasNext()) {
+      Tuple RowValue1 = it1.next();
+      ++numbRows;
+      System.out.println();
+      for (int i = 0; i < RowValue1.size(); ++i)
+        System.out.println("DEBUG: " + tablename + " RowValue.get(" + i
+            + ") = " + RowValue1.get(i));
+    }
+    System.out.println("\nRow count : " + numbRows);
+    return numbRows;
+  }
+
+  /**
+   * Remove directory
+   * 
+   */
+  public static void removeDir(Path outPath) throws IOException {
+    String command = null;
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
+          + outPath.toString();
+    } else {
+      command = "rm -rf " + outPath.toString();
+    }
+    Runtime runtime = Runtime.getRuntime();
+    Process proc = runtime.exec(command);
+    int exitVal = -1;
+    try {
+      exitVal = proc.waitFor();
+    } catch (InterruptedException e) {
+      System.err.println(e);
+    }
+  }
+
+  /**
+   * Calculate elapsed time
+   * 
+   */
+  private static String printTime(long start, long stop) {
+    long timeMillis = stop - start;
+    long time = timeMillis / 1000;
+    String seconds = Integer.toString((int) (time % 60));
+    String minutes = Integer.toString((int) ((time % 3600) / 60));
+    String hours = Integer.toString((int) (time / 3600));
+
+    for (int i = 0; i < 2; i++) {
+      if (seconds.length() < 2) {
+        seconds = "0" + seconds;
+      }
+      if (minutes.length() < 2) {
+        minutes = "0" + minutes;
+      }
+      if (hours.length() < 2) {
+        hours = "0" + hours;
+      }
+    }
+    String formatTime = hours + ":" + minutes + ":" + seconds;
+    return formatTime;
+  }
+
+  /**
+   * Main
+   * 
+   * Command line options:
+   * 
+   * -verifyOption : <load, sort, merge-join, sorted-union, dump, tableinfo,
+   * createtable, createsorttable, deletetable, printrows>
+   * 
+   * -pathTable1 : <hdfs path> -pathTable2 : <hdfs path>
+   * 
+   * -pathUnionTables : <hdfs path> <hdfs path> ...
+   * 
+   * -rowMod : verify every nth row (optional)
+   * 
+   * -numbCols : number of columns table should have (optional)
+   * 
+   * -sortCol : for sort option (default is column 0)
+   * 
+   * -sortString : sort string for sort option
+   * 
+   * -numbRows : number of rows for new table to create
+   * 
+   * -seed : unique column number used for creating new tables
+   * 
+   * -debug : print out debug info with results (use caution, for example do not
+   * used when creating large tables)
+   * 
+   * examples:
+   * 
+   * java -DwhichCluster="realCluster" -DHADOOP_HOME=$HADOOP_HOME -DUSER=$USER
+   * TestComparator -verifyOption load -pathTable1 /user/hadoopqa/table1
+   * -pathTable2 /user/hadoopqa/table2
+   * 
+   * java -DwhichCluster="realCluster" -DHADOOP_HOME=$HADOOP_HOME -DUSER=$USER
+   * TestComparator -verifyOption sort -pathTable1 /user/hadoopqa/table1
+   * -pathTable2 /user/hadoopqa/table2 -sortCol 0
+   * 
+   * java -DwhichCluster="realCluster" -DHADOOP_HOME=$HADOOP_HOME -DUSER=$USER
+   * TestComparator -verifyOption merge-join -pathTable1 /user/hadoopqa/table1
+   * -sortCol 0
+   * 
+   * java -DwhichCluster="realCluster" -DHADOOP_HOME=$HADOOP_HOME -DUSER=$USER
+   * TestComparator -verifyOption sorted-union -pathTable1
+   * /user/hadoopqa/unionTable1 -pathUnionTables /user/hadoopqa/inputTable1
+   * /user/hadoopqa/inputTable2 /user/hadoopqa/inputTable3 -sortCol 0 -rowMod 5
+   * 
+   * java -DwhichCluster="realCluster" -DHADOOP_HOME=$HADOOP_HOME -DUSER=$USER
+   * TestComparator -verifyOption dump -pathTable1 /user/hadoopqa/table1
+   * 
+   * @param args
+   */
+
+  public static void main(String[] args) {
+    long startTime = System.currentTimeMillis();
+
+    System.out.println("Running Zebra TestComparator");
+    try {
+      ArrayList<String> unionPaths = new ArrayList<String>();
+      String verifyOption = null;
+      String pathTable1 = null;
+      String pathTable2 = null;
+      String sortString = null;
+      String verifyDataColName = null;
+      int rowMod = 1; // default to verify every table row
+      int numbCols = 0; // if provided, verify that table has these number of
+      // columns
+      int sortCol = 0; // default to first column as sort index
+      long numbRows = 0; // number of rows to create for new table
+      int seed = 0; // used for creating new tabletable1
+      boolean debug = false;
+
+      // Read arguments
+      if (args.length >= 2) {
+        for (int i = 0; i < args.length; ++i) {
+
+          if (args[i].equals("-verifyOption")) {
+            verifyOption = args[++i];
+          } else if (args[i].equals("-pathTable1")) {
+            pathTable1 = args[++i];
+          } else if (args[i].equals("-pathTable2")) {
+            pathTable2 = args[++i];
+          } else if (args[i].equals("-pathUnionTables")) {
+            while (++i < args.length && !args[i].startsWith("-")) {
+              System.out.println("args[i] : " + args[i]);
+              unionPaths.add(args[i]);
+            }
+            if (i < args.length)
+              --i;
+          } else if (args[i].equals("-rowMod")) {
+            rowMod = new Integer(args[++i]).intValue();
+          } else if (args[i].equals("-sortString")) {
+            sortString = args[++i];
+          } else if (args[i].equals("-sortCol")) {
+            sortCol = new Integer(args[++i]).intValue();
+          } else if (args[i].equals("-numbCols")) {
+            numbCols = new Integer(args[++i]).intValue();
+          } else if (args[i].equals("-numbRows")) {
+            numbRows = new Long(args[++i]).intValue();
+          } else if (args[i].equals("-seed")) {
+            seed = new Integer(args[++i]).intValue();
+          } else if (args[i].equals("-verifyDataColName")) {
+            verifyDataColName = args[++i];
+          } else if (args[i].equals("-debug")) {
+            debug = true;
+          } else {
+            System.out.println("Exiting - unknown argument : " + args[i]);
+            System.exit(0);
+          }
+        }
+      } else {
+        System.out
+            .println("Error - need to provide required comparator arguments");
+        System.exit(0);
+      }
+
+      // Setup environment
+      setUp();
+
+      //
+      // Run appropriate verify option
+      //
+      if (verifyOption == null) {
+        System.out.println("Exiting -verifyOption not set");
+        System.exit(0);
+      }
+
+      if (verifyOption.equals("load")) {
+        // Verify both tables are equal
+        verifyLoad(pathTable1, pathTable2, numbCols);
+      } else if (verifyOption.equals("sort")) {
+        // Verify table is in sorted order
+        verifySorted(pathTable1, pathTable2, sortCol, sortString, numbCols,
+            rowMod);
+      } else if (verifyOption.equals("merge-join")) {
+        // Verify merge-join table is in sorted order
+        verifyMergeJoin(pathTable1, sortCol, sortString, numbCols, rowMod,verifyDataColName);
+      } else if (verifyOption.equals("sorted-union")) {
+        Object lastVal = null;
+        
+        // Verify sorted-union table is in sorted order
+        verifySortedUnion(unionPaths, pathTable1, sortCol, sortString,
+            numbCols, rowMod, verifyDataColName);
+      } else if (verifyOption.equals("dump")) {
+        // Dump table info
+        printTableInfo(pathTable1);
+      } else if (verifyOption.equals("tableinfo")) {
+        // Verify table to get row and column info
+        verifyTable(pathTable1);
+      } else if (verifyOption.equals("deletetable")) {
+        // Delete table directory
+        deleteTable(pathTable1);
+      } else if (verifyOption.equals("printrows")) {
+        // Print some table rows
+        printRows(pathTable1, numbRows);
+      } else if (verifyOption.equals("createtable")) {
+        // Create unsorted table
+        createtable(pathTable1, numbRows, seed, debug);
+      } else if (verifyOption.equals("createsorttable")) {
+        // Create sorted table
+        createsortedtable(pathTable1, pathTable2, sortString, debug);
+      }else if (verifyOption.equals("printrownumber")) {
+        Object lastVal = null;
+        //print total number of rows of the table
+        printRowNumber(pathTable1,sortString);
+      }
+      //
+      else {
+        System.out.println("Exiting - unknown -verifyOption value : "
+            + verifyOption);
+        System.exit(0);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    long stopTime = System.currentTimeMillis();
+    System.out.println("\nElapsed time : " + printTime(startTime, stopTime)
+        + "\n");
+  }
+
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestGlobTableLoader{
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+  private static Configuration conf;
+  private static String zebraJar;
+  private static String whichCluster;
+  private static FileSystem fs;
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    // if whichCluster is not defined, or defined something other than
+    // "realCluster" or "miniCluster", set it to "miniCluster"
+    if (System.getProperty("whichCluster") == null
+        || ((!System.getProperty("whichCluster")
+            .equalsIgnoreCase("realCluster")) && (!System.getProperty(
+            "whichCluster").equalsIgnoreCase("miniCluster")))) {
+      System.setProperty("whichCluster", "miniCluster");
+      whichCluster = System.getProperty("whichCluster");
+    } else {
+      whichCluster = System.getProperty("whichCluster");
+    }
+
+    System.out.println("cluster: " + whichCluster);
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("HADOOP_HOME") == null) {
+      System.out.println("Please set HADOOP_HOME");
+      System.exit(0);
+    }
+
+    conf = new Configuration();
+
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("USER") == null) {
+      System.out.println("Please set USER");
+      System.exit(0);
+    }
+    zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+    File file = new File(zebraJar);
+    if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
+      System.out.println("Please put zebra.jar at hadoop_home/../jars");
+      System.exit(0);
+    }
+
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+          .toProperties(conf));
+      pigServer.registerJar(zebraJar);
+      pathTable = new Path("/user/" + System.getenv("USER")
+          + "/TestMapTableLoader");
+      removeDir(pathTable);
+      fs = pathTable.getFileSystem(conf);
+    }
+
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      if (execType == ExecType.MAPREDUCE) {
+        cluster = MiniCluster.buildCluster();
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        fs = cluster.getFileSystem();
+        pathTable = new Path(fs.getWorkingDirectory()
+            + "/TestMapTableLoader1");
+        removeDir(pathTable);
+        System.out.println("path1 =" + pathTable);
+      } else {
+        pigServer = new PigServer(ExecType.LOCAL);
+      }
+    }
+
+
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "m1:map(string)", "[m1#{a}]", conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 2;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("a", "x");
+        map.put("b", "y");
+        map.put("c", "z");
+        tuple.set(0, map);
+
+        try {
+          inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+        } catch (Exception e) {
+          System.out.println(e.getMessage());
+        }
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+  public static void removeDir(Path outPath) throws IOException {
+    String command = null;
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+    command = System.getenv("HADOOP_HOME") +"/bin/hadoop fs -rmr " + outPath.toString();
+    }
+    else{
+    command = "rm -rf " + outPath.toString();
+    }
+    Runtime runtime = Runtime.getRuntime();
+    Process proc = runtime.exec(command);
+    int exitVal = -1;
+    try {
+      exitVal = proc.waitFor();
+    } catch (InterruptedException e) {
+      System.err.println(e);
+    }
+    
+  }
+
+  // @Test
+  public void test1() throws IOException, ParseException {
+    String projection = new String("m1#{b}");
+    BasicTable.Reader reader = new BasicTable.Reader(pathTable, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(pathTable, conf);
+    reader.setProjection(projection);
+
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+    // HashMap<String, Object> mapval;
+    while (!scanner.atEnd()) {
+      scanner.getKey(key);
+      // Assert.assertEquals(key, new BytesWritable("key0".getBytes()));
+      scanner.getValue(value);
+      System.out.println("key = " + key + " value = " + value);
+
+      // mapval = (HashMap<String, Object>) value.get(0);
+      // Assert.assertEquals("x", mapval.get("a"));
+      // Assert.assertEquals(null, mapval.get("b"));
+      // Assert.assertEquals(null, mapval.get("c"));
+      scanner.advance();
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testReader() throws ExecException, IOException {
+    pathTable = new Path("/user/" + System.getenv("USER")
+        + "/{TestMapTableLoader1}");
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('m1#{a}');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestOrderPreserveMultiTableGlob {
+	
+	final static int NUMB_TABLE = 10;  		// number of tables for stress test
+	final static int NUMB_TABLE_ROWS = 5;	// number of rows for each table
+	
+	final static String TABLE_SCHEMA = "int1:int,str1:string,byte1:bytes";
+	final static String TABLE_STORAGE = "[int1,str1,byte1]";
+	
+	static int fileId = 0;
+	static int sortId = 0;
+	
+	protected static ExecType execType = ExecType.MAPREDUCE;
+	private static MiniCluster cluster;
+	protected static PigServer pigServer;
+	protected static ExecJob pigJob;
+	
+	private static ArrayList<Path> pathTables;
+	private static int totalTableRows =0;
+	
+	private static Configuration conf;
+	private static FileSystem fs;
+	
+	private static String zebraJar;
+	private static String whichCluster;
+	
+	@BeforeClass
+	public static void setUp() throws Exception {
+		if (System.getProperty("hadoop.log.dir") == null) {
+			String base = new File(".").getPath(); // getAbsolutePath();
+			System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+		}
+		
+		// if whichCluster is not defined, or defined something other than
+		// "realCluster" or "miniCluster", set it to "realCluster"
+		if (System.getProperty("whichCluster") == null
+				|| ((!System.getProperty("whichCluster")
+						.equalsIgnoreCase("realCluster")) && (!System.getProperty(
+						"whichCluster").equalsIgnoreCase("miniCluster")))) {
+			System.setProperty("whichCluster", "miniCluster");
+			whichCluster = System.getProperty("whichCluster");
+		} else {
+			whichCluster = System.getProperty("whichCluster");
+		}
+		
+		System.out.println("cluster: " + whichCluster);
+		if (whichCluster.equalsIgnoreCase("realCluster")
+				&& System.getenv("HADOOP_HOME") == null) {
+			System.out.println("Please set HADOOP_HOME");
+			System.exit(0);
+		}
+		
+		conf = new Configuration();
+		
+		if (whichCluster.equalsIgnoreCase("realCluster")
+				&& System.getenv("USER") == null) {
+			System.out.println("Please set USER");
+			System.exit(0);
+		}
+		zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+		File file = new File(zebraJar);
+		if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
+			System.out.println("Please put zebra.jar at hadoop_home/../jars");
+			System.exit(0);
+		}
+		
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+					.toProperties(conf));
+			pigServer.registerJar(zebraJar);
+			
+			pathTables = new ArrayList<Path>();
+			for (int i=0; i<NUMB_TABLE; ++i) {
+				Path pathTable = new Path("/user/" + System.getenv("USER")
+						+ "/TestOderPerserveMultiTable" + i);
+				pathTables.add(pathTable);
+				removeDir(pathTable);
+			}
+			fs = pathTables.get(0).getFileSystem(conf);
+		}
+		
+		if (whichCluster.equalsIgnoreCase("miniCluster")) {
+			if (execType == ExecType.MAPREDUCE) {
+				cluster = MiniCluster.buildCluster();
+				pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+				fs = cluster.getFileSystem();
+				
+				pathTables = new ArrayList<Path>();
+				for (int i=0; i<NUMB_TABLE; ++i) {
+					Path pathTable = new Path(fs.getWorkingDirectory()
+							+ "/TestOderPerserveMultiTable" + i);
+					pathTables.add(pathTable);
+					removeDir(pathTable);
+				}
+			} else {
+				pigServer = new PigServer(ExecType.LOCAL);
+			}
+		}
+		
+		// Create tables
+		for (int i=0; i<NUMB_TABLE; ++i) {
+			// Create table data
+			Object[][] table = new Object[NUMB_TABLE_ROWS][3];  // three columns
+			
+			for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
+				table[j][0] = i;
+				table[j][1] = new String("string" + j);
+				table[j][2] = new DataByteArray("byte" + (NUMB_TABLE_ROWS - j));
+				++totalTableRows;
+			}
+			// Create table
+			createTable(pathTables.get(i), TABLE_SCHEMA, TABLE_STORAGE, table);
+			
+			// Load Table
+			String query = "table" + i + " = LOAD '" + pathTables.get(i).toString() + 
+					"' USING org.apache.hadoop.zebra.pig.TableLoader();";
+			pigServer.registerQuery(query);
+		}
+	}
+	
+	private static void createTable(Path path, String schemaString, String storageString, Object[][] tableData)
+			throws IOException {
+		//
+		// Create table from tableData array
+		//
+		BasicTable.Writer writer = new BasicTable.Writer(path, schemaString, storageString, conf);
+		
+		Schema schema = writer.getSchema();
+		Tuple tuple = TypesUtils.createTuple(schema);
+		TableInserter inserter = writer.getInserter("ins", false);
+		
+		for (int i = 0; i < tableData.length; ++i) {
+			TypesUtils.resetTuple(tuple);
+			for (int k = 0; k < tableData[i].length; ++k) {
+				tuple.set(k, tableData[i][k]);
+				System.out.println("DEBUG: setting tuple k=" + k + "value= " + tableData[i][k]);
+			}
+			inserter.insert(new BytesWritable(("key" + i).getBytes()), tuple);
+		}
+		inserter.close();
+		writer.close();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		pigServer.shutdown();
+	}
+	
+	public static void removeDir(Path outPath) throws IOException {
+		String command = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			command = System.getenv("HADOOP_HOME") +"/bin/hadoop fs -rmr " + outPath.toString();
+		}
+		else{
+			command = "rm -rf " + outPath.toString();
+		}
+		Runtime runtime = Runtime.getRuntime();
+		Process proc = runtime.exec(command);
+		int exitVal = -1;
+		try {
+			exitVal = proc.waitFor();
+		} catch (InterruptedException e) {
+			System.err.println(e);
+		}
+	}
+	
+	private Iterator<Tuple> testOrderPreserveUnion(ArrayList<String> inputTables, String sortkey, String columns)
+				throws IOException {
+		//
+		// Test order preserve union from input tables and provided output columns
+		//
+		Assert.assertTrue("Table union requires two or more input tables", inputTables.size() >= 2);
+		
+		Path newPath = new Path(getCurrentMethodName());
+		ArrayList<String> pathList = new ArrayList<String>();
+		
+		// Load and store each of the input tables
+		for (int i=0; i<inputTables.size(); ++i) {
+			String tablename = inputTables.get(i);
+			String sortName = "sort" + ++sortId;
+			
+			// Sort tables
+			String orderby = sortName + " = ORDER " + tablename + " BY " + sortkey + " ;";
+			pigServer.registerQuery(orderby);
+			
+			String sortPath = new String(newPath.toString() + ++fileId);  // increment fileId suffix
+			
+			// Store sorted tables
+			pigJob = pigServer.store(sortName, sortPath, TableStorer.class.getCanonicalName() +
+				"('" + TABLE_STORAGE + "')");
+			Assert.assertNull(pigJob.getException());
+			
+			pathList.add(sortPath);  // add table path to list
+		}
+		
+		String paths = new String();
+		
+    paths += newPath.toString() + "{";
+    fileId = 0;
+		for (String path:pathList)
+			paths += ++fileId + ",";
+		paths = paths.substring(0, paths.lastIndexOf(","));  // remove trailing comma
+    paths += "}";
+		
+		String queryLoad = "records1 = LOAD '"
+	        + paths
+	        +	"' USING org.apache.hadoop.zebra.pig.TableLoader('" + columns + "', 'sorted');";
+		
+		System.out.println("queryLoad: " + queryLoad);
+		pigServer.registerQuery(queryLoad);
+		
+		// Return iterator
+		Iterator<Tuple> it1 = pigServer.openIterator("records1");
+		return it1;
+	}
+	
+	@Test
+	public void test_sorted_union_multi_table() throws ExecException, IOException {
+		//
+		// Test sorted union
+		//
+		
+		// Create input tables for order preserve union
+		ArrayList<String> inputTables = new ArrayList<String>();  // Input tables
+		for (int i=0; i<NUMB_TABLE; ++i) {
+			inputTables.add("table" + i);  // add input table
+		}
+		
+		// Test with input tables and provided output columns
+		testOrderPreserveUnion(inputTables, "int1", "int1, str1, byte1");
+		
+		// Create results table for verification
+		ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+		for (int i=0; i<NUMB_TABLE; ++i) {
+			for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
+				ArrayList<Object> resultRow = new ArrayList<Object>();
+				
+				resultRow.add(i);	// int1
+				resultRow.add(new String("string" + j));	// str1
+				resultRow.add(new DataByteArray("byte" + (NUMB_TABLE_ROWS - j)));	// byte1
+				
+				resultTable.add(resultRow);
+			}
+		}
+		
+		// Verify union table
+		Iterator<Tuple> it = pigServer.openIterator("records1");
+		int numbRows = verifyTable(resultTable, 0, it);
+		
+		Assert.assertEquals(totalTableRows, numbRows);
+		
+		// Print Table
+		//printTable("records1");
+	}
+	
+	/**
+	 * Verify union output table with expected results
+	 * 
+	 */
+	private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int keyColumn, Iterator<Tuple> it) throws IOException {
+		int numbRows = 0;
+		int index = 0;
+		Object value = resultTable.get(index).get(keyColumn);  // get value of primary key
+		
+		while (it.hasNext()) {
+			Tuple rowValues = it.next();
+			
+			// If last primary sort key does match then search for next matching key
+			if (! compareObj(value, rowValues.get(keyColumn))) {
+				int subIndex = index + 1;
+				while (subIndex < resultTable.size()) {
+					if ( ! compareObj(value, resultTable.get(subIndex).get(keyColumn)) ) {  // found new key
+						index = subIndex;
+						value = resultTable.get(index).get(keyColumn);
+						break;
+					}
+					++subIndex;
+				}
+				Assert.assertEquals("Table comparison error for row : " + numbRows + " - no key found for : "
+					+ rowValues.get(keyColumn), value, rowValues.get(keyColumn));
+			}
+			// Search for matching row with this primary key
+			int subIndex = index;
+			
+			while (subIndex < resultTable.size()) {
+				// Compare row
+				ArrayList<Object> resultRow = resultTable.get(subIndex);
+				if ( compareRow(rowValues, resultRow) )
+					break; // found matching row
+				++subIndex;
+				Assert.assertEquals("Table comparison error for row : " + numbRows + " - no matching row found for : "
+					+ rowValues.get(keyColumn), value, resultTable.get(subIndex).get(keyColumn));
+			}
+			++numbRows;
+		}
+		Assert.assertEquals(resultTable.size(), numbRows);  // verify expected row count
+		return numbRows;
+	}
+	
+	/**
+	 * Compare table rows
+	 * 
+	 */
+	private boolean compareRow(Tuple rowValues, ArrayList<Object> resultRow) throws IOException {
+		boolean result = true;
+		Assert.assertEquals(resultRow.size(), rowValues.size());
+		for (int i = 0; i < rowValues.size(); ++i) {
+			if (! compareObj(rowValues.get(i), resultRow.get(i)) ) {
+				result = false;
+				break;
+			}
+		}
+		return result;
+	}
+	
+	/**
+	 * Compare table values
+	 * 
+	 */
+	private boolean compareObj(Object object1, Object object2) {
+		if (object1 == null) {
+			if (object2 == null)
+				return true;
+			else
+				return false;
+		} else if (object1.equals(object2))
+			return true;
+		else
+			return false;
+	}
+	
+	/**
+	 * Print Pig Table (for debugging)
+	 * 
+	 */
+	private int printTable(String tablename) throws IOException {
+		Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+		int numbRows = 0;
+		while (it1.hasNext()) {
+			Tuple RowValue1 = it1.next();
+			++numbRows;
+			System.out.println();
+			for (int i = 0; i < RowValue1.size(); ++i)
+				System.out.println("DEBUG: " + tablename + " RowValue.get(" + i + ") = " + RowValue1.get(i));
+		}
+		System.out.println("\nRow count : " + numbRows);
+		return numbRows;
+	}
+	
+	/**
+	 * Return the name of the routine that called getCurrentMethodName
+	 * 
+	 */
+	private String getCurrentMethodName() {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		PrintWriter pw = new PrintWriter(baos);
+		(new Throwable()).printStackTrace(pw);
+		pw.flush();
+		String stackTrace = baos.toString();
+		pw.close();
+		
+		StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+		tok.nextToken(); // 'java.lang.Throwable'
+		tok.nextToken(); // 'at ...getCurrentMethodName'
+		String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+		// Parse line 3
+		tok = new StringTokenizer(l.trim(), " <(");
+		String t = tok.nextToken(); // 'at'
+		t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+		return t;
+	}
+	
+}

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java Fri Jan  8 18:17:07 2010
@@ -455,7 +455,7 @@
 		} finally {
 			//System.out.println(getStackTrace(exception));
 			Assert.assertNotNull(exception);
-			Assert.assertTrue(getStackTrace(exception).contains("Schema file doesn't exist"));
+      Assert.assertTrue(getStackTrace(exception).contains("Input path does not exist: "));
 		}
 	}
 	
@@ -465,6 +465,7 @@
 		// Test sorted union error handling when one of the table paths is invalid (Negative test)
 		//
 		IOException exception = null;
+		String pathSort2 = null;
 		
 		try {
 			// Sort tables
@@ -479,7 +480,7 @@
 				"('" + TABLE1_STORAGE + "')");
 			Assert.assertNull(pigJob.getException());
 			
-			String pathSort2 = newPath.toString() + "2";  // invalid path
+			pathSort2 = newPath.toString() + "2";  // invalid path
 			
 			String queryLoad = "records1 = LOAD '"
 		        + pathSort1 + ","

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageRecord2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageRecord2.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageRecord2.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageRecord2.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.StringReader;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashSet;
+import junit.framework.Assert;
+
+import org.apache.hadoop.zebra.types.CGSchema;
+import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Partition;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.parser.TableSchemaParser;
+import org.apache.hadoop.zebra.schema.Schema.ColumnSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStorageRecord2 {
+  String strSch = "r1:record(f1:int, f2:int), r2:record(f5:int, r3:record(f3:float, f4))";
+  TableSchemaParser parser;
+  Schema schema;
+
+  @Before
+  public void init() throws ParseException {
+    parser = new TableSchemaParser(new StringReader(strSch));
+    schema = parser.RecordSchema(null);
+  }
+
+  @Test
+  public void testStorageValid1() {
+    try {
+      String strStorage = "[r1.f1, r2.r3.f3, r2.f5]; [r1.f2, r2.r3.f4]";
+      Partition p = new Partition(schema.toString(), strStorage, null);
+      CGSchema[] cgschemas = p.getCGSchemas();
+
+      // 2 column group;
+      int size = cgschemas.length;
+      Assert.assertEquals(size, 2);
+      System.out.println("********** Column Groups **********");
+      for (int i = 0; i < cgschemas.length; i++) {
+        System.out.println(cgschemas[i]);
+        System.out.println("--------------------------------");
+      }
+      CGSchema cgs1 = cgschemas[0];
+      CGSchema cgs2 = cgschemas[1];
+
+      ColumnSchema f11 = cgs1.getSchema().getColumn(0);
+      Assert.assertEquals("r1.f1", f11.getName());
+      Assert.assertEquals(ColumnType.INT, f11.getType());
+      ColumnSchema f12 = cgs1.getSchema().getColumn(1);
+      Assert.assertEquals("r2.r3.f3", f12.getName());
+      Assert.assertEquals(ColumnType.FLOAT, f12.getType());
+
+      ColumnSchema f21 = cgs2.getSchema().getColumn(0);
+      Assert.assertEquals("r1.f2", f21.getName());
+      Assert.assertEquals(ColumnType.INT, f21.getType());
+      ColumnSchema f22 = cgs2.getSchema().getColumn(1);
+      Assert.assertEquals("r2.r3.f4", f22.getName());
+      Assert.assertEquals(ColumnType.BYTES, f22.getType());
+
+      System.out.println("*********** Column Map **********");
+      Map<String, HashSet<Partition.PartitionInfo.ColumnMappingEntry>> colmap = p
+          .getPartitionInfo().getColMap();
+      Assert.assertEquals(colmap.size(), 5);
+      Iterator<Map.Entry<String, HashSet<Partition.PartitionInfo.ColumnMappingEntry>>> it = colmap
+          .entrySet().iterator();
+      for (int i = 0; i < colmap.size(); i++) {
+        Map.Entry<String, HashSet<Partition.PartitionInfo.ColumnMappingEntry>> entry = (Map.Entry<String, HashSet<Partition.PartitionInfo.ColumnMappingEntry>>) it
+            .next();
+        String name = entry.getKey();
+        HashSet<Partition.PartitionInfo.ColumnMappingEntry> hs = entry
+            .getValue();
+        Iterator<Partition.PartitionInfo.ColumnMappingEntry> it1 = hs
+            .iterator();
+        for (int j = 0; j < hs.size(); j++) {
+          Partition.PartitionInfo.ColumnMappingEntry cme = (Partition.PartitionInfo.ColumnMappingEntry) it1
+              .next();
+          System.out.println("[Column = " + name + " CG = " + cme.getCGIndex()
+              + "." + cme.getFieldIndex() + "]");
+          if (i == 0 && j == 0) {
+            Assert.assertEquals(name, "r2.f5");
+            Assert.assertEquals(cme.getCGIndex(), 0);
+            Assert.assertEquals(cme.getFieldIndex(), 2);
+          } else if (i == 1 && j == 0) {
+            Assert.assertEquals(name, "r1.f1");
+            Assert.assertEquals(cme.getCGIndex(), 0);
+            Assert.assertEquals(cme.getFieldIndex(), 0);
+          } else if (i == 2 && j == 0) {
+            Assert.assertEquals(name, "r1.f2");
+            Assert.assertEquals(cme.getCGIndex(), 1);
+            Assert.assertEquals(cme.getFieldIndex(), 0);
+          } else if (i == 3 && j == 0) {
+            Assert.assertEquals(name, "r2.r3.f3");
+            Assert.assertEquals(cme.getCGIndex(), 0);
+            Assert.assertEquals(cme.getFieldIndex(), 1);
+          } else if (i == 4 && j == 0) {
+            Assert.assertEquals(name, "r2.r3.f4");
+            Assert.assertEquals(cme.getCGIndex(), 1);
+            Assert.assertEquals(cme.getFieldIndex(), 1);
+          }
+        }
+      }
+    } catch (Exception e) {
+      Assert.assertTrue(false);
+    }
+  }
+
+  /*
+   * @Test public void testStorageInvalid1() { try { String strStorage =
+   * "m1#k1"; TableStorageParser parser = new TableStorageParser(new
+   * ByteArrayInputStream(strStorage.getBytes("UTF-8")), null, schema);
+   * ArrayList<CGSchema> schemas = parser.StorageSchema(); CGSchema cgs1 =
+   * schemas.get(0); } catch (Exception e) { String errMsg = e.getMessage();
+   * String str = "Encountered \" <IDENTIFIER> \"m1 \"\" at line 1, column 1.";
+   * System.out.println(errMsg); System.out.println(str);
+   * Assert.assertEquals(errMsg.startsWith(str), true); } }
+   */
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/bad_join.pig
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/bad_join.pig?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/bad_join.pig (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/bad_join.pig Fri Jan  8 18:17:07 2010
@@ -0,0 +1,23 @@
+register $zebraJar;
+--fs -rmr $outputDir
+
+
+--a1 = LOAD '$inputDir/unsorted1' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,byte2');
+--a2 = LOAD '$inputDir/unsorted2' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,byte2');
+
+--sort1 = order a1 by byte2;
+--sort2 = order a2 by byte2;
+
+--store sort1 into '$outputDir/100Msortedbyte21' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2];[byte2]');
+--store sort2 into '$outputDir/100Msortedbyte22' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2];[byte2]');
+
+rec1 = load '$outputDir/100Msortedbyte21' using org.apache.hadoop.zebra.pig.TableLoader('','sorted');
+rec2 = load '$outputDir/100Msortedbyte22' using org.apache.hadoop.zebra.pig.TableLoader('','sorted');
+
+joina = join rec1 by byte2, rec2 by byte2 using "merge" ;
+
+E = foreach joina  generate $0 as count,  $1 as seed,  $2 as int1,  $3 as str2, $4 as byte2;
+
+store E into '$outputDir/bad3' using org.apache.hadoop.zebra.pig.TableStorer('');
+
+

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/collecion4.pig
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/collecion4.pig?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/collecion4.pig (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/collecion4.pig Fri Jan  8 18:17:07 2010
@@ -0,0 +1,16 @@
+register /grid/0/dev/hadoopqa/jars/zebra.jar;
+
+a1 = LOAD '/data/SDS_HTable' USING org.apache.hadoop.zebra.pig.TableLoader('MLF_viewinfo');
+--limitedVals = LIMIT a1 10;
+--dump limitedVals;
+
+store a1 into '/data/collection_viewinfo1' using org.apache.hadoop.zebra.pig.TableStorer('[MLF_viewinfo]');    
+
+a2 = LOAD '/data/collection_viewinfo1' USING org.apache.hadoop.zebra.pig.TableLoader('MLF_viewinfo');
+--limitedVals = LIMIT a2 10;
+--dump limitedVals;
+                      
+
+store a2 into '/data/collection_viewinfo2' using org.apache.hadoop.zebra.pig.TableStorer('[MLF_viewinfo]');    
+
+             

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/config
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/config?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/config (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/config Fri Jan  8 18:17:07 2010
@@ -0,0 +1,5 @@
+zebraJar=/grid/0/dev/hadoopqa/jars/zebra.jar
+inputDir=/data/zebraStress/input
+outputDir=/data/zebraStress/output
+unsorted1=unsorted1
+unsorted2=unsorted2

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join.pig
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join.pig?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join.pig (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join.pig Fri Jan  8 18:17:07 2010
@@ -0,0 +1,18 @@
+register $zebraJar;
+--fs -rmr $outputDir
+
+
+
+rec1 = load '$outputDir/u1' using org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+rec2 = load '$outputDir/u2' using org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+
+
+joina = join rec1 by long1, rec2 by long1 using "merge" ;
+
+E = foreach joina  generate $0 as count,  $1 as seed,  $2 as int1,  $3 as str2, $4 as long1;
+joinE = order E by long1 parallel 25;
+
+
+
+store joinE into '$outputDir/j1' using org.apache.hadoop.zebra.pig.TableStorer('');
+                                                 

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join2.pig
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join2.pig?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join2.pig (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join2.pig Fri Jan  8 18:17:07 2010
@@ -0,0 +1,18 @@
+register $zebraJar;
+--fs -rmr $outputDir
+
+
+
+rec1 = load '$outputDir/u3' using org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+rec2 = load '$outputDir/u4' using org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+
+
+joina = join rec1 by long1, rec2 by long1 using "merge" ;
+
+E = foreach joina  generate $0 as count,  $1 as seed,  $2 as int1,  $3 as str2, $4 as long1;
+joinE = order E by long1 parallel 25;
+
+
+
+store joinE into '$outputDir/j2' using org.apache.hadoop.zebra.pig.TableStorer('');
+                                                 

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union.pig
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union.pig?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union.pig (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union.pig Fri Jan  8 18:17:07 2010
@@ -0,0 +1,39 @@
+register $zebraJar;
+--fs -rmr $outputDir
+
+
+a1 = LOAD '$inputDir/25Munsorted1' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+a2 = LOAD '$inputDir/25Munsorted2' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');    
+a3 = LOAD '$inputDir/25Munsorted3' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+a4 = LOAD '$inputDir/25Munsorted4' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+
+sort1 = order a1 by long1;
+sort2 = order a2 by long1;  
+sort3 = order a3 by long1;
+sort4 = order a4 by long1;
+
+store sort1 into '$outputDir/25Msorted1' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+store sort2 into '$outputDir/25Msorted2' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]'); 
+store sort3 into '$outputDir/25Msorted3' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+store sort4 into '$outputDir/25Msorted4' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');     
+                                                                                                                        
+
+joinl = LOAD '$outputDir/25Msorted1,$outputDir/25Msorted2' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+joinll = order joinl by long1; 
+store joinll into '$outputDir/unionl' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');          
+
+
+joinr = LOAD '$outputDir/25Msorted3,$outputDir/25Msorted4' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+joinrr = order joinr by long1; 
+store joinrr into '$outputDir/unionr' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+rec1 = load '$outputDir/unionl' using org.apache.hadoop.zebra.pig.TableLoader();
+rec2 = load '$outputDir/unionr' using org.apache.hadoop.zebra.pig.TableLoader();   
+
+
+joina = join rec1 by long1, rec2 by long1 using "merge" ;
+
+E = foreach joina  generate $0 as count,  $1 as seed,  $2 as int1,  $3 as str2, $4 as long1;
+
+store E into '$outputDir/join_after_union_1' using org.apache.hadoop.zebra.pig.TableStorer('');                  

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union10k.pig
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union10k.pig?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union10k.pig (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union10k.pig Fri Jan  8 18:17:07 2010
@@ -0,0 +1,43 @@
+register $zebraJar;
+--fs -rmr $outputDir
+
+
+a1 = LOAD '$inputDir/10k1' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+a2 = LOAD '$inputDir/10k2' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+a3 = LOAD '$inputDir/10k3' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+a4 = LOAD '$inputDir/10k4' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+
+sort1 = order a1 by long1 parallel 6;
+sort2 = order a2 by long1 parallel 5;
+sort3 = order a3 by long1 parallel 7;
+sort4 = order a4 by long1 parallel 4;
+
+store sort1 into '$outputDir/sortedlong110k1' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+store sort2 into '$outputDir/sortedlong110k2' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+store sort3 into '$outputDir/sortedlong110k3' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+store sort4 into '$outputDir/sortedlong110k4' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+joinl = LOAD '$outputDir/sortedlong110k1,$outputDir/sortedlong110k2' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+joinll = order joinl by long1 parallel 7;
+store joinll into '$outputDir/union10kl' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+joinr = LOAD '$outputDir/sortedlong110k3,$outputDir/sortedlong110k4' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+joinrr = order joinr by long1 parallel 4;
+store joinrr into '$outputDir/union10kr' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+rec1 = load '$outputDir/union10kl' using org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+rec2 = load '$outputDir/union10kr' using org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+
+
+joina = join rec1 by long1, rec2 by long1 using "merge" ;
+
+E = foreach joina  generate $0 as count,  $1 as seed,  $2 as int1,  $3 as str2, $4 as long1;
+joinE = order E by long1 parallel 25;
+
+limitedVals = LIMIT joina 10;
+dump limitedVals;
+
+store joinE into '$outputDir/join_after_union_10k' using org.apache.hadoop.zebra.pig.TableStorer('');  

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union2.pig
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union2.pig?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union2.pig (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union2.pig Fri Jan  8 18:17:07 2010
@@ -0,0 +1,39 @@
+register $zebraJar;
+--fs -rmr $outputDir
+
+
+--a1 = LOAD '$inputDir/25Munsorted1' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+--a2 = LOAD '$inputDir/25Munsorted2' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+--a3 = LOAD '$inputDir/25Munsorted3' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+--a4 = LOAD '$inputDir/25Munsorted4' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+
+--sort1 = order a1 by long1 parallel 6;
+--sort2 = order a2 by long1 parallel 5;
+--sort3 = order a3 by long1 parallel 7;
+--sort4 = order a4 by long1 parallel 4;
+
+--store sort1 into '$outputDir/25Msorted11' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+--store sort2 into '$outputDir/25Msorted21' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+--store sort3 into '$outputDir/25Msorted31' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+--store sort4 into '$outputDir/25Msorted41' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+joinl = LOAD '$outputDir/25Msorted11,$outputDir/25Msorted21' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+joinll = order joinl by long1 parallel 7;
+store joinll into '$outputDir/unionl1' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+joinr = LOAD '$outputDir/25Msorted31,$outputDir/25Msorted41' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+joinrr = order joinr by long1 parallel 4;
+store joinrr into '$outputDir/unionr1' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+rec1 = load '$outputDir/unionl1' using org.apache.hadoop.zebra.pig.TableLoader();
+rec2 = load '$outputDir/unionr1' using org.apache.hadoop.zebra.pig.TableLoader();
+
+
+joina = join rec1 by long1, rec2 by long1 using "merge" ;
+
+E = foreach joina  generate $0 as count,  $1 as seed,  $2 as int1,  $3 as str2, $4 as long1;
+joinE = order E by long1 parallel 25; 
+store joinE into '$outputDir/join_after_union_11' using org.apache.hadoop.zebra.pig.TableStorer(''); 

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union3.pig
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union3.pig?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union3.pig (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/stress/join_after_union3.pig Fri Jan  8 18:17:07 2010
@@ -0,0 +1,43 @@
+register $zebraJar;
+--fs -rmr $outputDir
+
+
+--a1 = LOAD '$inputDir/25Munsorted1' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+--a2 = LOAD '$inputDir/25Munsorted2' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+--a3 = LOAD '$inputDir/25Munsorted3' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+--a4 = LOAD '$inputDir/25Munsorted4' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1');
+
+--sort1 = order a1 by long1 parallel 6;
+--sort2 = order a2 by long1 parallel 5;
+--sort3 = order a3 by long1 parallel 7;
+--sort4 = order a4 by long1 parallel 4;
+
+--store sort1 into '$outputDir/25Msorted11' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+--store sort2 into '$outputDir/25Msorted21' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+--store sort3 into '$outputDir/25Msorted31' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+--store sort4 into '$outputDir/25Msorted41' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+--joinl = LOAD '$outputDir/25Msorted11,$outputDir/25Msorted21' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+--joinll = order joinl by long1 parallel 7;
+--store joinll into '$outputDir/unionl1' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+--joinr = LOAD '$outputDir/25Msorted31,$outputDir/25Msorted41' USING org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+--joinrr = order joinr by long1 parallel 4;
+--store joinrr into '$outputDir/unionr1' using org.apache.hadoop.zebra.pig.TableStorer('[count,seed,int1,str2,long1]');
+
+
+rec1 = load '$outputDir/unionl1' using org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+rec2 = load '$outputDir/unionr1' using org.apache.hadoop.zebra.pig.TableLoader('count,seed,int1,str2,long1', 'sorted');
+
+
+joina = join rec1 by long1, rec2 by long1 using "merge" ;
+
+E = foreach joina  generate $0 as count,  $1 as seed,  $2 as int1,  $3 as str2, $4 as long1;
+joinE = order E by long1 parallel 25;
+
+--limitedVals = LIMIT joina 10;
+--dump limitedVals;
+
+store joinE into '$outputDir/join_after_union_13' using org.apache.hadoop.zebra.pig.TableStorer('');             



Mime
View raw message