Author: daijy Date: Tue Jul 12 01:13:20 2011 New Revision: 1145418 URL: http://svn.apache.org/viewvc?rev=1145418&view=rev Log: PIG-1890: Fix piggybank unit test TestAvroStorage Modified: pig/trunk/CHANGES.txt pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1145418&r1=1145417&r2=1145418&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue Jul 12 01:13:20 2011 @@ -66,6 +66,8 @@ PIG-2011: Speed up TestTypedMap.java (dv BUG FIXES +PIG-1890: Fix piggybank unit test TestAvroStorage (kengoodhope via daijy) + PIG-2110: NullPointerException in piggybank.evaluation.util.apachelogparser.SearchTermExtractor (dale_jin via daijy) PIG-2144: ClassCastException when using IsEmpty(DIFF()) (thejas) Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1145418&r1=1145417&r2=1145418&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Tue Jul 12 01:13:20 2011 @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.HashSet; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.file.DataFileStream; @@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.Recor import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.pig.Expression; import org.apache.pig.FileInputLoadFunc; import org.apache.pig.LoadFunc; @@ -128,7 +130,9 @@ public class AvroStorage extends FileInp */ @Override public void setLocation(String location, Job job) throws IOException { - if(AvroStorageUtils.addInputPaths(location, job) && inputAvroSchema == null) { + HashSet paths = new HashSet(); + if(AvroStorageUtils.getAllSubDirs(new Path(location), job, paths) && inputAvroSchema == null) { + FileInputFormat.setInputPaths(job, paths.toArray(new Path[0])); inputAvroSchema = getAvroSchema(location, job); } } @@ -188,6 +192,8 @@ public class AvroStorage extends FileInp if (schema == null) System.err.println("Cannot get avro schema! Input path " + path + " might be empty."); + + System.err.println(schema.toString()); return schema; } @@ -211,6 +217,7 @@ public class AvroStorage extends FileInp DataFileStream avroDataStream = new DataFileStream(hdfsInputStream, avroReader); Schema ret = avroDataStream.getSchema(); avroDataStream.close(); + return ret; } @@ -267,6 +274,9 @@ public class AvroStorage extends FileInp /* convert to pig schema */ ResourceSchema pigSchema = AvroSchema2Pig.convert(inputAvroSchema); AvroStorageLog.details("pig input schema:" + pigSchema); + if (pigSchema.getFields().length == 1){ + pigSchema = pigSchema.getFields()[0].getSchema(); + } return pigSchema; } else return null; @@ -567,7 +577,7 @@ public class AvroStorage extends FileInp @Override public void putNext(Tuple t) throws IOException { try { - this.writer.write(NullWritable.get(), t); + this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t); } catch (InterruptedException e) { e.printStackTrace(); } Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1145418&r1=1145417&r2=1145418&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Tue Jul 12 01:13:20 2011 @@ -17,8 +17,11 @@ package org.apache.pig.piggybank.storage.avro; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -35,6 +38,8 @@ import org.apache.hadoop.mapreduce.lib.i import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.data.DataType; +import org.apache.pig.piggybank.storage.avro.AvroStorageLog; + /** * This is utility class for this package @@ -87,54 +92,51 @@ public class AvroStorageUtils { } /** - * get input paths to job config + * get input paths to job config */ public static boolean addInputPaths(String pathString, Job job) - throws IOException { - Configuration conf = job.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - - Path path = new Path(pathString); - FileStatus pathStatus = fs.getFileStatus(path); - - List input = new LinkedList(); - if (PATH_FILTER.accept(path)) { // remove input path with leading "." or "_" - input.add(pathStatus); - } - - boolean ret = false; - while (!input.isEmpty()) { - - FileStatus status = input.remove(0); - Path p = status.getPath(); - - if (!status.isDir() ) { - AvroStorageLog.details("Add input path:" + p); - FileInputFormat.addInputPath(job, p); - ret = true; - } - else { - /*list all sub-dirs*/ - FileStatus[] ss = fs.listStatus(p, PATH_FILTER); - - if (ss.length > 0) { - if (noDir(ss) ) { - AvroStorageLog.details("Add input path:" + p); - FileInputFormat.addInputPath(job, p); - ret = true; - } - else { - input.addAll(Arrays.asList(ss)); - ret = true; - } - - } - } - } + throws IOException + { + Configuration conf = job.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + HashSet paths = new HashSet(); + if (getAllSubDirs(new Path(pathString), job, paths)) + { + paths.addAll(Arrays.asList(FileInputFormat.getInputPaths(job))); + FileInputFormat.setInputPaths(job, paths.toArray(new Path[0])); + return true; + } + return false; - return ret; } + /** + * Adds all non-hidden directories and subdirectories to set param + * + * @throws IOException + */ + static boolean getAllSubDirs(Path path, Job job, Set paths) throws IOException { + FileSystem fs = FileSystem.get(job.getConfiguration()); + if (PATH_FILTER.accept(path)) { + try { + FileStatus file = fs.getFileStatus(path); + if (file.isDir()) { + for (FileStatus sub : fs.listStatus(path)) { + getAllSubDirs(sub.getPath(), job, paths); + } + } else { + AvroStorageLog.details("Add input file:" + file); + paths.add(file.getPath()); + } + } catch (FileNotFoundException e) { + AvroStorageLog.details("Input path does not exist: " + path); + return false; + } + return true; + } + return false; + } + /** check whether there is NO directory in the input file (status) list*/ public static boolean noDir(FileStatus [] ss) { for (FileStatus s : ss) { Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java?rev=1145418&r1=1145417&r2=1145418&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java Tue Jul 12 01:13:20 2011 @@ -54,15 +54,10 @@ public class PigSchema2Avro { ResourceFieldSchema[] pigFields = pigSchema.getFields(); /* remove the pig tuple wrapper */ - if (pigFields.length == 1 && AvroStorageUtils.isTupleWrapper(pigFields[0])) { + if (pigFields.length == 1) { AvroStorageLog.details("Ignore the pig tuple wrapper."); - ResourceFieldSchema[] listSchemas = pigFields[0].getSchema() - .getFields(); - if (listSchemas.length != 1) - throw new IOException("Expect one subfield from " + pigFields); - - return convert(listSchemas[0], nullable); + return convert(pigFields[0], nullable); } else return convertRecord(pigFields, nullable); } Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1145418&r1=1145417&r2=1145418&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original) +++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Tue Jul 12 01:13:20 2011 @@ -76,121 +76,116 @@ public class TestAvroStorage { } @Test - public void testDummy() { - // Dummy test, will remove after PIG-1890 check in. Otherwise, Junit will complain "No runnable methods" + public void testArrayDefault() throws IOException { + String output= outbasedir + "testArrayDefault"; + String expected = basedir + "expected_testArrayDefault.avro"; + + deleteDirectory(new File(output)); + + String [] queries = { + " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", + " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();" + }; + testAvroStorage( queries); + verifyResults(output, expected); + } + + + @Test + public void testArrayWithSchema() throws IOException { + String output= outbasedir + "testArrayWithSchema"; + String expected = basedir + "expected_testArrayWithSchema.avro"; + deleteDirectory(new File(output)); + String [] queries = { + " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", + " STORE in INTO '" + output + + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " + + " 'schema', '{\"type\":\"array\",\"items\":\"float\"}' );" + }; + testAvroStorage( queries); + verifyResults(output, expected); + } + + @Test + public void testArrayWithNotNull() throws IOException { + String output= outbasedir + "testArrayWithNotNull"; + String expected = basedir + "expected_testArrayWithSchema.avro"; + deleteDirectory(new File(output)); + String [] queries = { + " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", + " STORE in INTO '" + output + + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " + + " '{\"nullable\": false }' );" + }; + testAvroStorage( queries); + verifyResults(output, expected); + } + + @Test + public void testArrayWithSame() throws IOException { + String output= outbasedir + "testArrayWithSame"; + String expected = basedir + "expected_testArrayWithSchema.avro"; + deleteDirectory(new File(output)); + String [] queries = { + " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", + " STORE in INTO '" + output + + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " + + " 'same', '" + testArrayFile + "' );" + }; + testAvroStorage(queries); + verifyResults(output, expected); + } + + @Test + public void testRecordWithSplit() throws IOException { + String output1= outbasedir + "testRecordSplit1"; + String output2= outbasedir + "testRecordSplit2"; + String expected1 = basedir + "expected_testRecordSplit1.avro"; + String expected2 = basedir + "expected_testRecordSplit2.avro"; + deleteDirectory(new File(output1)); + deleteDirectory(new File(output2)); + String [] queries = { + " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", + " groups = GROUP avro BY member_id;", + " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;", + " STORE sc INTO '" + output1 + "' " + + " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" + + "'{\"index\": 1, " + + " \"schema\": {\"type\":\"record\", " + + " \"name\":\"result\", " + + " \"fields\":[ {\"name\":\"member_id\",\"type\":\"int\"}, " + + "{\"name\":\"count\", \"type\":\"long\"} " + + "]" + + "}" + + " }');", + " STORE sc INTO '" + output2 + + " 'USING org.apache.pig.piggybank.storage.avro.AvroStorage ('index', '2');" + }; + testAvroStorage( queries); + verifyResults(output1, expected1); + verifyResults(output2, expected2); + } + + @Test + public void testRecordWithFieldSchema() throws IOException { + String output= outbasedir + "testRecordWithFieldSchema"; + String expected = basedir + "expected_testRecordWithFieldSchema.avro"; + deleteDirectory(new File(output)); + String [] queries = { + " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", + " avro1 = FILTER avro BY member_id > 1211;", + " avro2 = FOREACH avro1 GENERATE member_id, browser_id, tracking_time, act_content ;", + " STORE avro2 INTO '" + output + "' " + + " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" + + "'{\"data\": \"" + testRecordFile + "\" ," + + " \"field0\": \"int\", " + + " \"field1\": \"def:browser_id\", " + + " \"field3\": \"def:act_content\" " + + " }');" + }; + testAvroStorage( queries); + verifyResults(output, expected); } -// Comment out all test cases until PIG-1890 fixed -// @Test -// public void testArrayDefault() throws IOException { -// String output= outbasedir + "testArrayDefault"; -// String expected = basedir + "expected_testArrayDefault.avro"; -// -// deleteDirectory(new File(output)); -// -// String [] queries = { -// " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", -// " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();" -// }; -// testAvroStorage( queries); -// verifyResults(output, expected); -// } -// -// -// @Test -// public void testArrayWithSchema() throws IOException { -// String output= outbasedir + "testArrayWithSchema"; -// String expected = basedir + "expected_testArrayWithSchema.avro"; -// deleteDirectory(new File(output)); -// String [] queries = { -// " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", -// " STORE in INTO '" + output + -// "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " + -// " 'schema', '{\"type\":\"array\",\"items\":\"float\"}' );" -// }; -// testAvroStorage( queries); -// verifyResults(output, expected); -// } -// -// @Test -// public void testArrayWithNotNull() throws IOException { -// String output= outbasedir + "testArrayWithNotNull"; -// String expected = basedir + "expected_testArrayWithSchema.avro"; -// deleteDirectory(new File(output)); -// String [] queries = { -// " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", -// " STORE in INTO '" + output + -// "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " + -// " '{\"nullable\": false }' );" -// }; -// testAvroStorage( queries); -// verifyResults(output, expected); -// } -// -// @Test -// public void testArrayWithSame() throws IOException { -// String output= outbasedir + "testArrayWithSame"; -// String expected = basedir + "expected_testArrayWithSchema.avro"; -// deleteDirectory(new File(output)); -// String [] queries = { -// " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", -// " STORE in INTO '" + output + -// "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " + -// " 'same', '" + testArrayFile + "' );" -// }; -// testAvroStorage(queries); -// verifyResults(output, expected); -// } -// -// @Test -// public void testRecordWithSplit() throws IOException { -// String output1= outbasedir + "testRecordSplit1"; -// String output2= outbasedir + "testRecordSplit2"; -// String expected1 = basedir + "expected_testRecordSplit1.avro"; -// String expected2 = basedir + "expected_testRecordSplit2.avro"; -// deleteDirectory(new File(output1)); -// deleteDirectory(new File(output2)); -// String [] queries = { -// " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", -// " groups = GROUP avro BY member_id;", -// " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;", -// " STORE sc INTO '" + output1 + "' " + -// " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" + -// "'{\"index\": 1, " + -// " \"schema\": {\"type\":\"record\", " + -// " \"name\":\"result\", " + -// " \"fields\":[ {\"name\":\"member_id\",\"type\":\"int\"}, " + -// "{\"name\":\"count\", \"type\":\"long\"} " + -// "]" + -// "}" + -// " }');", -// " STORE sc INTO '" + output2 + -// " 'USING org.apache.pig.piggybank.storage.avro.AvroStorage ('index', '2');" -// }; -// testAvroStorage( queries); -// verifyResults(output1, expected1); -// verifyResults(output2, expected2); -// } -// -// @Test -// public void testRecordWithFieldSchema() throws IOException { -// String output= outbasedir + "testRecordWithFieldSchema"; -// String expected = basedir + "expected_testRecordWithFieldSchema.avro"; -// deleteDirectory(new File(output)); -// String [] queries = { -// " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", -// " avro1 = FILTER avro BY member_id > 1211;", -// " avro2 = FOREACH avro1 GENERATE member_id, browser_id, tracking_time, act_content ;", -// " STORE avro2 INTO '" + output + "' " + -// " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" + -// "'{\"data\": \"" + testRecordFile + "\" ," + -// " \"field0\": \"int\", " + -// " \"field1\": \"def:browser_id\", " + -// " \"field3\": \"def:act_content\" " + -// " }');" -// }; -// testAvroStorage( queries); -// verifyResults(output, expected); -// } private static void deleteDirectory (File path) { if ( path.exists()) {