Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6844F9BC0 for ; Thu, 13 Sep 2012 21:17:40 +0000 (UTC) Received: (qmail 39574 invoked by uid 500); 13 Sep 2012 21:17:40 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 39545 invoked by uid 500); 13 Sep 2012 21:17:40 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 39535 invoked by uid 99); 13 Sep 2012 21:17:40 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Sep 2012 21:17:40 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Sep 2012 21:17:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4F3F1238897F for ; Thu, 13 Sep 2012 21:16:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1384536 - in /pig/trunk/contrib: ./ piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/ piggybank/java/src/test/java/org/apache/pig/piggybank/test/sto... Date: Thu, 13 Sep 2012 21:16:55 -0000 To: commits@pig.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120913211655.4F3F1238897F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gates Date: Thu Sep 13 21:16:54 2012 New Revision: 1384536 URL: http://svn.apache.org/viewvc?rev=1384536&view=rev Log: PIG-2909 Add a new option for ignoring corrupted files to AvroStorage load func Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro (with props) pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro (with props) Modified: pig/trunk/contrib/CHANGES.txt pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Modified: pig/trunk/contrib/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/contrib/CHANGES.txt?rev=1384536&r1=1384535&r2=1384536&view=diff ============================================================================== --- pig/trunk/contrib/CHANGES.txt (original) +++ pig/trunk/contrib/CHANGES.txt Thu Sep 13 21:16:54 2012 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-2909 Add a new option for ignoring corrupted files to AvroStorage load func (cheolsoo via gates) + PIG-2202 AvroStorage doesn't work with Avro 1.5.1 (billgraham via gates) PIG-1959 Penny: a framework for workflow instrumentation (olston, breed via gates) Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1384536&r1=1384535&r2=1384536&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Thu Sep 13 21:16:54 2012 @@ -88,6 +88,7 @@ public class AvroStorage extends FileInp private Schema inputAvroSchema = null; /* input avro schema */ private boolean checkSchema = true; /*whether check schema of input directories*/ + private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */ /** * Empty constructor. Output schema is derived from pig schema. @@ -112,9 +113,11 @@ public class AvroStorage extends FileInp nullable = true; checkSchema = true; - if (parts.length == 1 && !parts[0].equalsIgnoreCase("no_schema_check")) { - /* If one parameter is given, and that is not 'no_schema_check', - * then it must be a json string. + if (parts.length == 1 + && !parts[0].equalsIgnoreCase("no_schema_check") + && !parts[0].equalsIgnoreCase("ignore_bad_files")) { + /* If one parameter is given, and that is neither 'no_schema_check' + * nor 'ignore_bad_files', then it must be a json string. */ init(parseJsonString(parts[0])); } else { @@ -259,7 +262,7 @@ public class AvroStorage extends FileInp public InputFormat getInputFormat() throws IOException { AvroStorageLog.funcCall("getInputFormat"); if(inputAvroSchema != null) - return new PigAvroInputFormat(inputAvroSchema); + return new PigAvroInputFormat(inputAvroSchema, ignoreBadFiles); else return new TextInputFormat(); } @@ -381,6 +384,10 @@ public class AvroStorage extends FileInp checkSchema = false; /* parameter only, so increase iteration counter by 1 */ i += 1; + } else if (name.equalsIgnoreCase("ignore_bad_files")) { + ignoreBadFiles = true; + /* parameter only, so increase iteration counter by 1 */ + i += 1; } else { String value = parts[i+1].trim(); if (name.equalsIgnoreCase("debug") @@ -627,6 +634,11 @@ public class AvroStorage extends FileInp } @Override + public void cleanupOnSuccess(String location, Job job) throws IOException { + // Nothing to do + } + + @Override public void putNext(Tuple t) throws IOException { try { this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t); @@ -634,9 +646,4 @@ public class AvroStorage extends FileInp e.printStackTrace(); } } - - @Override - public void cleanupOnSuccess(String location, Job job) throws IOException{ - - } } Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1384536&r1=1384535&r2=1384536&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Thu Sep 13 21:16:54 2012 @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.lib.i public class PigAvroInputFormat extends FileInputFormat { private Schema schema = null; /* avro schema */ + private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */ /** * empty constructor @@ -46,11 +47,13 @@ public class PigAvroInputFormat extends } /** - * constructor called by AvroStorage to pass in schema - * @param s input data schema + * constructor called by AvroStorage to pass in schema and ignoreBadFiles. + * @param schema input data schema + * @param ignoreBadFiles whether ignore corrupted files during load */ - public PigAvroInputFormat(Schema s) { - schema = s; + public PigAvroInputFormat(Schema schema, boolean ignoreBadFiles) { + this.schema = schema; + this.ignoreBadFiles = ignoreBadFiles; } /** @@ -63,7 +66,7 @@ public class PigAvroInputFormat extends createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { context.setStatus(split.toString()); - return new PigAvroRecordReader(context, (FileSplit) split, schema); + return new PigAvroRecordReader(context, (FileSplit) split, schema, ignoreBadFiles); } } Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1384536&r1=1384535&r2=1384536&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Thu Sep 13 21:16:54 2012 @@ -18,8 +18,12 @@ package org.apache.pig.piggybank.storage.avro; import java.io.IOException; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; @@ -36,17 +40,22 @@ import org.apache.pig.data.TupleFactory; */ public class PigAvroRecordReader extends RecordReader { + private static final Log LOG = LogFactory.getLog(PigAvroRecordReader.class); + private AvroStorageInputStream in; private DataFileReader reader; /*reader of input avro data*/ private long start; private long end; + private Path path; + private boolean ignoreBadFiles; /** * constructor to initialize input and avro data reader */ public PigAvroRecordReader(TaskAttemptContext context, FileSplit split, - Schema schema) throws IOException { - this.in = new AvroStorageInputStream(split.getPath(), context); + Schema schema, boolean ignoreBadFiles) throws IOException { + this.path = split.getPath(); + this.in = new AvroStorageInputStream(path, context); if(schema == null) throw new IOException("Need to provide input avro schema"); @@ -59,6 +68,7 @@ public class PigAvroRecordReader extends this.reader.sync(split.getStart()); // sync to start this.start = in.tell(); this.end = split.getStart() + split.getLength(); + this.ignoreBadFiles = ignoreBadFiles; } @Override @@ -108,10 +118,21 @@ public class PigAvroRecordReader extends @Override public boolean nextKeyValue() throws IOException, InterruptedException { - if (!reader.hasNext() || reader.pastSync(end)) - return false; - - return true; + try { + if (!reader.hasNext() || reader.pastSync(end)) { + return false; + } + return true; + } catch (AvroRuntimeException e) { + if (ignoreBadFiles) { + // For currupted files, AvroRuntimeException can be thrown. + // We ignore them if the option 'ignore_bad_files' is enabled. + LOG.warn("Ignoring bad file '" + path + "'."); + return false; + } else { + throw e; + } + } } } Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1384536&r1=1384535&r2=1384536&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original) +++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Thu Sep 13 21:16:54 2012 @@ -158,6 +158,7 @@ public class TestAvroStorage { " } ]" + " } ]" + " }"; + final private String testCorruptedFile = getInputFile("test_corrupted_file.avro"); @BeforeClass public static void setup() throws ExecException { @@ -841,6 +842,35 @@ public class TestAvroStorage { verifyResults(output, expected); } + @Test + public void testCorruptedFile1() throws IOException { + // Verify that load fails when bad files are found if ignore_bad_files is disabled. + String output = outbasedir + "testCorruptedFile1"; + deleteDirectory(new File(output)); + String [] queries = { + " in = LOAD '" + testCorruptedFile + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", + " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();" + }; + // Job is expected to fail for bad files. + testAvroStorage(true, queries); + } + + @Test + public void testCorruptedFile2() throws IOException { + // Verify that corrupted files are skipped if ignore_bad_files is enabled. + // Output is expected to be empty. + String output = outbasedir + "testCorruptedFile2"; + String expected = basedir + "expected_testCorruptedFile.avro"; + deleteDirectory(new File(output)); + String [] queries = { + " in = LOAD '" + testCorruptedFile + "'" + + " USING org.apache.pig.piggybank.storage.avro.AvroStorage ('ignore_bad_files');", + " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();" + }; + testAvroStorage(queries); + verifyResults(output, expected); + } + private static void deleteDirectory (File path) { if ( path.exists()) { File [] files = path.listFiles(); Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro?rev=1384536&view=auto ============================================================================== Binary file - no diff available. Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro?rev=1384536&view=auto ============================================================================== Binary file - no diff available. Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream