pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From y...@apache.org
Subject svn commit: r895753 - in /hadoop/pig/trunk/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/pig/ src/test/org/apache/hadoop/zebra/pig/
Date Mon, 04 Jan 2010 19:33:55 GMT
Author: yanz
Date: Mon Jan  4 19:33:53 2010
New Revision: 895753

URL: http://svn.apache.org/viewvc?rev=895753&view=rev
Log:
PIG-1167: Hadoop file glob support (yanz)

Added:
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=895753&r1=895752&r2=895753&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Mon Jan  4 19:33:53 2010
@@ -52,6 +52,8 @@
 
   BUG FIXES
 
+    PIG-1167: Hadoop file glob support (yanz)
+
     PIG-1153: Record split exception fix (yanz)
 
     PIG-1145: Merge Join on Large Table throws an EOF exception (yanz)

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=895753&r1=895752&r2=895753&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Mon
Jan  4 19:33:53 2010
@@ -226,7 +226,7 @@
 				FileSystem fs = p.getFileSystem(jobConf);
 				FileStatus[] matches = fs.globStatus(p);
 				if (matches == null) {
-					LOG.warn("Input path does not exist: " + p);
+					throw new IOException("Input path does not exist: " + p);
 				}
 				else if (matches.length == 0) {
 					LOG.warn("Input Pattern " + p + " matches 0 files");
@@ -293,33 +293,14 @@
 		
 		Projection projection;
 
-		if (!fileName.contains(",")) { // one table;
-			org.apache.hadoop.zebra.schema.Schema tschema = BasicTable.Reader.getSchema(new Path(fileName),
jobConf);
-			try {
-				projection = new org.apache.hadoop.zebra.types.Projection(tschema, TableInputFormat.getProjection(jobConf));
-				projectionSchema = projection.getProjectionSchema();
-			} catch (ParseException e) {
-				throw new IOException("Schema parsing failed : "+e.getMessage());
-			}
-		} else { // table union;
-			org.apache.hadoop.zebra.schema.Schema unionSchema = new org.apache.hadoop.zebra.schema.Schema();
-			for (Path p : paths) {
-				org.apache.hadoop.zebra.schema.Schema schema = BasicTable.Reader.getSchema(p, jobConf);
-				try {
-					unionSchema.unionSchema(schema);
-				} catch (ParseException e) {
-					throw new IOException(e.getMessage());
-				}
-			}
-			
-			try {
-				projection = new org.apache.hadoop.zebra.types.Projection(unionSchema, TableInputFormat.getProjection(jobConf));
-				projectionSchema = projection.getProjectionSchema();
-			} catch (ParseException e) {
-				throw new IOException("Schema parsing failed : "+e.getMessage());
-			}
-		}		
-    
+    org.apache.hadoop.zebra.schema.Schema tschema = TableInputFormat.getSchema(jobConf);
+    try {
+      projection = new org.apache.hadoop.zebra.types.Projection(tschema, TableInputFormat.getProjection(jobConf));
+      projectionSchema = projection.getProjectionSchema();
+    } catch (ParseException e) {
+      throw new IOException("Schema parsing failed : "+e.getMessage());
+    }
+
 		if (projectionSchema == null) {
 			throw new IOException("Cannot determine table projection schema");
 		}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java?rev=895753&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
(added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
Mon Jan  4 19:33:53 2010
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestGlobTableLoader{
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+  private static Configuration conf;
+  private static String zebraJar;
+  private static String whichCluster;
+  private static FileSystem fs;
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    // if whichCluster is not defined, or defined something other than
+    // "realCluster" or "miniCluster", set it to "miniCluster"
+    if (System.getProperty("whichCluster") == null
+        || ((!System.getProperty("whichCluster")
+            .equalsIgnoreCase("realCluster")) && (!System.getProperty(
+            "whichCluster").equalsIgnoreCase("miniCluster")))) {
+      System.setProperty("whichCluster", "miniCluster");
+      whichCluster = System.getProperty("whichCluster");
+    } else {
+      whichCluster = System.getProperty("whichCluster");
+    }
+
+    System.out.println("cluster: " + whichCluster);
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("HADOOP_HOME") == null) {
+      System.out.println("Please set HADOOP_HOME");
+      System.exit(0);
+    }
+
+    conf = new Configuration();
+
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("USER") == null) {
+      System.out.println("Please set USER");
+      System.exit(0);
+    }
+    zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+    File file = new File(zebraJar);
+    if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
+      System.out.println("Please put zebra.jar at hadoop_home/../jars");
+      System.exit(0);
+    }
+
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+          .toProperties(conf));
+      pigServer.registerJar(zebraJar);
+      pathTable = new Path("/user/" + System.getenv("USER")
+          + "/TestMapTableLoader");
+      removeDir(pathTable);
+      fs = pathTable.getFileSystem(conf);
+    }
+
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      if (execType == ExecType.MAPREDUCE) {
+        cluster = MiniCluster.buildCluster();
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        fs = cluster.getFileSystem();
+        pathTable = new Path(fs.getWorkingDirectory()
+            + "/TestMapTableLoader1");
+        removeDir(pathTable);
+        System.out.println("path1 =" + pathTable);
+      } else {
+        pigServer = new PigServer(ExecType.LOCAL);
+      }
+    }
+
+
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "m1:map(string)", "[m1#{a}]", conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 2;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("a", "x");
+        map.put("b", "y");
+        map.put("c", "z");
+        tuple.set(0, map);
+
+        try {
+          inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+        } catch (Exception e) {
+          System.out.println(e.getMessage());
+        }
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+  public static void removeDir(Path outPath) throws IOException {
+    String command = null;
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+    command = System.getenv("HADOOP_HOME") +"/bin/hadoop fs -rmr " + outPath.toString();
+    }
+    else{
+    command = "rm -rf " + outPath.toString();
+    }
+    Runtime runtime = Runtime.getRuntime();
+    Process proc = runtime.exec(command);
+    int exitVal = -1;
+    try {
+      exitVal = proc.waitFor();
+    } catch (InterruptedException e) {
+      System.err.println(e);
+    }
+    
+  }
+
+  // @Test
+  public void test1() throws IOException, ParseException {
+    String projection = new String("m1#{b}");
+    BasicTable.Reader reader = new BasicTable.Reader(pathTable, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(pathTable, conf);
+    reader.setProjection(projection);
+
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+    // HashMap<String, Object> mapval;
+    while (!scanner.atEnd()) {
+      scanner.getKey(key);
+      // Assert.assertEquals(key, new BytesWritable("key0".getBytes()));
+      scanner.getValue(value);
+      System.out.println("key = " + key + " value = " + value);
+
+      // mapval = (HashMap<String, Object>) value.get(0);
+      // Assert.assertEquals("x", mapval.get("a"));
+      // Assert.assertEquals(null, mapval.get("b"));
+      // Assert.assertEquals(null, mapval.get("c"));
+      scanner.advance();
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testReader() throws ExecException, IOException {
+    pathTable = new Path("/user/" + System.getenv("USER")
+        + "/{TestMapTableLoader1}");
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('m1#{a}');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java?rev=895753&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
(added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
Mon Jan  4 19:33:53 2010
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestOrderPreserveMultiTableGlob {
+	
+	final static int NUMB_TABLE = 10;  		// number of tables for stress test
+	final static int NUMB_TABLE_ROWS = 5;	// number of rows for each table
+	
+	final static String TABLE_SCHEMA = "int1:int,str1:string,byte1:bytes";
+	final static String TABLE_STORAGE = "[int1,str1,byte1]";
+	
+	static int fileId = 0;
+	static int sortId = 0;
+	
+	protected static ExecType execType = ExecType.MAPREDUCE;
+	private static MiniCluster cluster;
+	protected static PigServer pigServer;
+	protected static ExecJob pigJob;
+	
+	private static ArrayList<Path> pathTables;
+	private static int totalTableRows =0;
+	
+	private static Configuration conf;
+	private static FileSystem fs;
+	
+	private static String zebraJar;
+	private static String whichCluster;
+	
+	@BeforeClass
+	public static void setUp() throws Exception {
+		if (System.getProperty("hadoop.log.dir") == null) {
+			String base = new File(".").getPath(); // getAbsolutePath();
+			System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+		}
+		
+		// if whichCluster is not defined, or defined something other than
+		// "realCluster" or "miniCluster", set it to "realCluster"
+		if (System.getProperty("whichCluster") == null
+				|| ((!System.getProperty("whichCluster")
+						.equalsIgnoreCase("realCluster")) && (!System.getProperty(
+						"whichCluster").equalsIgnoreCase("miniCluster")))) {
+			System.setProperty("whichCluster", "miniCluster");
+			whichCluster = System.getProperty("whichCluster");
+		} else {
+			whichCluster = System.getProperty("whichCluster");
+		}
+		
+		System.out.println("cluster: " + whichCluster);
+		if (whichCluster.equalsIgnoreCase("realCluster")
+				&& System.getenv("HADOOP_HOME") == null) {
+			System.out.println("Please set HADOOP_HOME");
+			System.exit(0);
+		}
+		
+		conf = new Configuration();
+		
+		if (whichCluster.equalsIgnoreCase("realCluster")
+				&& System.getenv("USER") == null) {
+			System.out.println("Please set USER");
+			System.exit(0);
+		}
+		zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+		File file = new File(zebraJar);
+		if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
+			System.out.println("Please put zebra.jar at hadoop_home/../jars");
+			System.exit(0);
+		}
+		
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+					.toProperties(conf));
+			pigServer.registerJar(zebraJar);
+			
+			pathTables = new ArrayList<Path>();
+			for (int i=0; i<NUMB_TABLE; ++i) {
+				Path pathTable = new Path("/user/" + System.getenv("USER")
+						+ "/TestOderPerserveMultiTable" + i);
+				pathTables.add(pathTable);
+				removeDir(pathTable);
+			}
+			fs = pathTables.get(0).getFileSystem(conf);
+		}
+		
+		if (whichCluster.equalsIgnoreCase("miniCluster")) {
+			if (execType == ExecType.MAPREDUCE) {
+				cluster = MiniCluster.buildCluster();
+				pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+				fs = cluster.getFileSystem();
+				
+				pathTables = new ArrayList<Path>();
+				for (int i=0; i<NUMB_TABLE; ++i) {
+					Path pathTable = new Path(fs.getWorkingDirectory()
+							+ "/TestOderPerserveMultiTable" + i);
+					pathTables.add(pathTable);
+					removeDir(pathTable);
+				}
+			} else {
+				pigServer = new PigServer(ExecType.LOCAL);
+			}
+		}
+		
+		// Create tables
+		for (int i=0; i<NUMB_TABLE; ++i) {
+			// Create table data
+			Object[][] table = new Object[NUMB_TABLE_ROWS][3];  // three columns
+			
+			for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
+				table[j][0] = i;
+				table[j][1] = new String("string" + j);
+				table[j][2] = new DataByteArray("byte" + (NUMB_TABLE_ROWS - j));
+				++totalTableRows;
+			}
+			// Create table
+			createTable(pathTables.get(i), TABLE_SCHEMA, TABLE_STORAGE, table);
+			
+			// Load Table
+			String query = "table" + i + " = LOAD '" + pathTables.get(i).toString() + 
+					"' USING org.apache.hadoop.zebra.pig.TableLoader();";
+			pigServer.registerQuery(query);
+		}
+	}
+	
+	private static void createTable(Path path, String schemaString, String storageString, Object[][]
tableData)
+			throws IOException {
+		//
+		// Create table from tableData array
+		//
+		BasicTable.Writer writer = new BasicTable.Writer(path, schemaString, storageString, conf);
+		
+		Schema schema = writer.getSchema();
+		Tuple tuple = TypesUtils.createTuple(schema);
+		TableInserter inserter = writer.getInserter("ins", false);
+		
+		for (int i = 0; i < tableData.length; ++i) {
+			TypesUtils.resetTuple(tuple);
+			for (int k = 0; k < tableData[i].length; ++k) {
+				tuple.set(k, tableData[i][k]);
+				System.out.println("DEBUG: setting tuple k=" + k + "value= " + tableData[i][k]);
+			}
+			inserter.insert(new BytesWritable(("key" + i).getBytes()), tuple);
+		}
+		inserter.close();
+		writer.close();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		pigServer.shutdown();
+	}
+	
+	public static void removeDir(Path outPath) throws IOException {
+		String command = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			command = System.getenv("HADOOP_HOME") +"/bin/hadoop fs -rmr " + outPath.toString();
+		}
+		else{
+			command = "rm -rf " + outPath.toString();
+		}
+		Runtime runtime = Runtime.getRuntime();
+		Process proc = runtime.exec(command);
+		int exitVal = -1;
+		try {
+			exitVal = proc.waitFor();
+		} catch (InterruptedException e) {
+			System.err.println(e);
+		}
+	}
+	
+	private Iterator<Tuple> testOrderPreserveUnion(ArrayList<String> inputTables,
String sortkey, String columns)
+				throws IOException {
+		//
+		// Test order preserve union from input tables and provided output columns
+		//
+		Assert.assertTrue("Table union requires two or more input tables", inputTables.size() >=
2);
+		
+		Path newPath = new Path(getCurrentMethodName());
+		ArrayList<String> pathList = new ArrayList<String>();
+		
+		// Load and store each of the input tables
+		for (int i=0; i<inputTables.size(); ++i) {
+			String tablename = inputTables.get(i);
+			String sortName = "sort" + ++sortId;
+			
+			// Sort tables
+			String orderby = sortName + " = ORDER " + tablename + " BY " + sortkey + " ;";
+			pigServer.registerQuery(orderby);
+			
+			String sortPath = new String(newPath.toString() + ++fileId);  // increment fileId suffix
+			
+			// Store sorted tables
+			pigJob = pigServer.store(sortName, sortPath, TableStorer.class.getCanonicalName() +
+				"('" + TABLE_STORAGE + "')");
+			Assert.assertNull(pigJob.getException());
+			
+			pathList.add(sortPath);  // add table path to list
+		}
+		
+		String paths = new String();
+		
+    paths += newPath.toString() + "{";
+    fileId = 0;
+		for (String path:pathList)
+			paths += ++fileId + ",";
+		paths = paths.substring(0, paths.lastIndexOf(","));  // remove trailing comma
+    paths += "}";
+		
+		String queryLoad = "records1 = LOAD '"
+	        + paths
+	        +	"' USING org.apache.hadoop.zebra.pig.TableLoader('" + columns + "', 'sorted');";
+		
+		System.out.println("queryLoad: " + queryLoad);
+		pigServer.registerQuery(queryLoad);
+		
+		// Return iterator
+		Iterator<Tuple> it1 = pigServer.openIterator("records1");
+		return it1;
+	}
+	
+	@Test
+	public void test_sorted_union_multi_table() throws ExecException, IOException {
+		//
+		// Test sorted union
+		//
+		
+		// Create input tables for order preserve union
+		ArrayList<String> inputTables = new ArrayList<String>();  // Input tables
+		for (int i=0; i<NUMB_TABLE; ++i) {
+			inputTables.add("table" + i);  // add input table
+		}
+		
+		// Test with input tables and provided output columns
+		testOrderPreserveUnion(inputTables, "int1", "int1, str1, byte1");
+		
+		// Create results table for verification
+		ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+		for (int i=0; i<NUMB_TABLE; ++i) {
+			for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
+				ArrayList<Object> resultRow = new ArrayList<Object>();
+				
+				resultRow.add(i);	// int1
+				resultRow.add(new String("string" + j));	// str1
+				resultRow.add(new DataByteArray("byte" + (NUMB_TABLE_ROWS - j)));	// byte1
+				
+				resultTable.add(resultRow);
+			}
+		}
+		
+		// Verify union table
+		Iterator<Tuple> it = pigServer.openIterator("records1");
+		int numbRows = verifyTable(resultTable, 0, it);
+		
+		Assert.assertEquals(totalTableRows, numbRows);
+		
+		// Print Table
+		//printTable("records1");
+	}
+	
+	/**
+	 * Verify union output table with expected results
+	 * 
+	 */
+	private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int keyColumn,
Iterator<Tuple> it) throws IOException {
+		int numbRows = 0;
+		int index = 0;
+		Object value = resultTable.get(index).get(keyColumn);  // get value of primary key
+		
+		while (it.hasNext()) {
+			Tuple rowValues = it.next();
+			
+			// If last primary sort key does match then search for next matching key
+			if (! compareObj(value, rowValues.get(keyColumn))) {
+				int subIndex = index + 1;
+				while (subIndex < resultTable.size()) {
+					if ( ! compareObj(value, resultTable.get(subIndex).get(keyColumn)) ) {  // found new
key
+						index = subIndex;
+						value = resultTable.get(index).get(keyColumn);
+						break;
+					}
+					++subIndex;
+				}
+				Assert.assertEquals("Table comparison error for row : " + numbRows + " - no key found
for : "
+					+ rowValues.get(keyColumn), value, rowValues.get(keyColumn));
+			}
+			// Search for matching row with this primary key
+			int subIndex = index;
+			
+			while (subIndex < resultTable.size()) {
+				// Compare row
+				ArrayList<Object> resultRow = resultTable.get(subIndex);
+				if ( compareRow(rowValues, resultRow) )
+					break; // found matching row
+				++subIndex;
+				Assert.assertEquals("Table comparison error for row : " + numbRows + " - no matching
row found for : "
+					+ rowValues.get(keyColumn), value, resultTable.get(subIndex).get(keyColumn));
+			}
+			++numbRows;
+		}
+		Assert.assertEquals(resultTable.size(), numbRows);  // verify expected row count
+		return numbRows;
+	}
+	
+	/**
+	 * Compare table rows
+	 * 
+	 */
+	private boolean compareRow(Tuple rowValues, ArrayList<Object> resultRow) throws IOException
{
+		boolean result = true;
+		Assert.assertEquals(resultRow.size(), rowValues.size());
+		for (int i = 0; i < rowValues.size(); ++i) {
+			if (! compareObj(rowValues.get(i), resultRow.get(i)) ) {
+				result = false;
+				break;
+			}
+		}
+		return result;
+	}
+	
+	/**
+	 * Compare table values
+	 * 
+	 */
+	private boolean compareObj(Object object1, Object object2) {
+		if (object1 == null) {
+			if (object2 == null)
+				return true;
+			else
+				return false;
+		} else if (object1.equals(object2))
+			return true;
+		else
+			return false;
+	}
+	
+	/**
+	 * Print Pig Table (for debugging)
+	 * 
+	 */
+	private int printTable(String tablename) throws IOException {
+		Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+		int numbRows = 0;
+		while (it1.hasNext()) {
+			Tuple RowValue1 = it1.next();
+			++numbRows;
+			System.out.println();
+			for (int i = 0; i < RowValue1.size(); ++i)
+				System.out.println("DEBUG: " + tablename + " RowValue.get(" + i + ") = " + RowValue1.get(i));
+		}
+		System.out.println("\nRow count : " + numbRows);
+		return numbRows;
+	}
+	
+	/**
+	 * Return the name of the routine that called getCurrentMethodName
+	 * 
+	 */
+	private String getCurrentMethodName() {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		PrintWriter pw = new PrintWriter(baos);
+		(new Throwable()).printStackTrace(pw);
+		pw.flush();
+		String stackTrace = baos.toString();
+		pw.close();
+		
+		StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+		tok.nextToken(); // 'java.lang.Throwable'
+		tok.nextToken(); // 'at ...getCurrentMethodName'
+		String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+		// Parse line 3
+		tok = new StringTokenizer(l.trim(), " <(");
+		String t = tok.nextToken(); // 'at'
+		t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+		return t;
+	}
+	
+}

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java?rev=895753&r1=895752&r2=895753&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
(original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
Mon Jan  4 19:33:53 2010
@@ -455,7 +455,7 @@
 		} finally {
 			//System.out.println(getStackTrace(exception));
 			Assert.assertNotNull(exception);
-			Assert.assertTrue(getStackTrace(exception).contains("Schema file doesn't exist"));
+      Assert.assertTrue(getStackTrace(exception).contains("Input path does not exist: "));
 		}
 	}
 	
@@ -465,6 +465,7 @@
 		// Test sorted union error handling when one of the table paths is invalid (Negative test)
 		//
 		IOException exception = null;
+		String pathSort2 = null;
 		
 		try {
 			// Sort tables
@@ -479,7 +480,7 @@
 				"('" + TABLE1_STORAGE + "')");
 			Assert.assertNull(pigJob.getException());
 			
-			String pathSort2 = newPath.toString() + "2";  // invalid path
+			pathSort2 = newPath.toString() + "2";  // invalid path
 			
 			String queryLoad = "records1 = LOAD '"
 		        + pathSort1 + ","



Mime
View raw message