From hcatalog-commits-return-1029-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Fri Aug 31 15:18:56 2012 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EE72AD68F for ; Fri, 31 Aug 2012 15:18:55 +0000 (UTC) Received: (qmail 61117 invoked by uid 500); 31 Aug 2012 15:18:55 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 61087 invoked by uid 500); 31 Aug 2012 15:18:55 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 61079 invoked by uid 99); 31 Aug 2012 15:18:55 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Aug 2012 15:18:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Aug 2012 15:18:51 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id F3D2023888E4; Fri, 31 Aug 2012 15:18:06 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1379476 - in /incubator/hcatalog/trunk: CHANGES.txt ivy.xml src/java/org/apache/hcatalog/common/HCatConstants.java src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java Date: Fri, 31 Aug 2012 15:18:06 -0000 To: hcatalog-commits@incubator.apache.org From: travis@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120831151806.F3D2023888E4@eris.apache.org> Author: travis Date: Fri Aug 31 15:18:06 2012 New Revision: 1379476 URL: http://svn.apache.org/viewvc?rev=1379476&view=rev Log: HCATALOG-487 HCatalog should tolerate a user-defined amount of bad records Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java Modified: incubator/hcatalog/trunk/CHANGES.txt incubator/hcatalog/trunk/ivy.xml incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Modified: incubator/hcatalog/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1379476&r1=1379475&r2=1379476&view=diff ============================================================================== --- incubator/hcatalog/trunk/CHANGES.txt (original) +++ incubator/hcatalog/trunk/CHANGES.txt Fri Aug 31 15:18:06 2012 @@ -38,6 +38,8 @@ Trunk (unreleased changes) HCAT-427 Document storage-based authorization (lefty via gates) IMPROVEMENTS + HCAT-487 HCatalog should tolerate a user-defined amount of bad records (traviscrawford) + HCAT-488 TestHCatStorer should extend HCatBaseTest so it uses junit4 and runs inside an IDE (traviscrawford) HCAT-486 HCatalog should use checkstyle to enforce coding style (traviscrawford) Modified: incubator/hcatalog/trunk/ivy.xml URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/ivy.xml?rev=1379476&r1=1379475&r2=1379476&view=diff ============================================================================== --- incubator/hcatalog/trunk/ivy.xml (original) +++ incubator/hcatalog/trunk/ivy.xml Fri Aug 31 15:18:06 2012 @@ -74,6 +74,8 @@ - + + + Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1379476&r1=1379475&r2=1379476&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Fri Aug 31 15:18:06 2012 @@ -141,4 +141,22 @@ public final class HCatConstants { public static final String HCAT_DATA_TINY_SMALL_INT_PROMOTION = "hcat.data.tiny.small.int.promotion"; public static final boolean HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT = false; + + /** + * {@value} (default: {@value #HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT}). + * Threshold for the ratio of bad records that will be silently skipped without causing a task + * failure. This is useful when processing large data sets with corrupt records, when its + * acceptable to skip some bad records. + */ + public static final String HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY = "hcat.input.bad.record.threshold"; + public static final float HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT = 0.0001f; + + /** + * {@value} (default: {@value #HCAT_INPUT_BAD_RECORD_MIN_DEFAULT}). + * Number of bad records that will be accepted before applying + * {@value #HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY}. This is necessary to prevent an initial bad + * record from causing a task failure. + */ + public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min"; + public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2; } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1379476&r1=1379475&r2=1379476&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Fri Aug 31 15:18:06 2012 @@ -20,6 +20,7 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.Writable; @@ -45,8 +46,12 @@ import org.slf4j.LoggerFactory; class HCatRecordReader extends RecordReader { private static final Logger LOG = LoggerFactory.getLogger(HCatRecordReader.class); + + private InputErrorTracker errorTracker; + WritableComparable currentKey; Writable currentValue; + HCatRecord currentHCatRecord; /** The underlying record reader to delegate to. */ private org.apache.hadoop.mapred.RecordReader baseRecordReader; @@ -95,6 +100,8 @@ class HCatRecordReader extends RecordRea // Pull the table schema out of the Split info // TODO This should be passed in the TaskAttemptContext instead dataSchema = hcatSplit.getDataSchema(); + + errorTracker = new InputErrorTracker(taskContext.getConfiguration()); } private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit, @@ -137,30 +144,8 @@ class HCatRecordReader extends RecordRea * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() */ @Override - public HCatRecord getCurrentValue() - throws IOException, InterruptedException { - HCatRecord r; - - try { - - r = new LazyHCatRecord(deserializer.deserialize(currentValue), deserializer.getObjectInspector()); - DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size()); - int i = 0; - for (String fieldName : outputSchema.getFieldNames()){ - Integer dataPosn = null; - if ((dataPosn = dataSchema.getPosition(fieldName)) != null){ - dr.set(i, r.get(fieldName,dataSchema)); - } else { - dr.set(i, valuesNotInDataCols.get(fieldName)); - } - i++; - } - - return dr; - - } catch (Exception e) { - throw new IOException("Failed to create HCatRecord ",e); - } + public HCatRecord getCurrentValue() throws IOException, InterruptedException { + return currentHCatRecord; } /* (non-Javadoc) @@ -176,21 +161,59 @@ class HCatRecordReader extends RecordRea return 0.0f; // errored } - /* (non-Javadoc) - * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue() - */ - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (currentKey == null) { - currentKey = baseRecordReader.createKey(); - currentValue = baseRecordReader.createValue(); + /** + * Check if the wrapped RecordReader has another record, and if so convert it into an + * HCatRecord. We both check for records and convert here so a configurable percent of + * bad records can be tolerated. + * + * @return if there is a next record + * @throws IOException on error + * @throws InterruptedException on error + */ + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (currentKey == null) { + currentKey = baseRecordReader.createKey(); + currentValue = baseRecordReader.createValue(); + } + + while (baseRecordReader.next(currentKey, currentValue)) { + HCatRecord r = null; + Throwable t = null; + + errorTracker.incRecords(); + + try { + Object o = deserializer.deserialize(currentValue); + r = new LazyHCatRecord(o, deserializer.getObjectInspector()); + } catch (Throwable throwable) { + t = throwable; + } + + if (r == null) { + errorTracker.incErrors(t); + continue; } - return baseRecordReader.next(currentKey, - currentValue); + DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size()); + int i = 0; + for (String fieldName : outputSchema.getFieldNames()) { + if (dataSchema.getPosition(fieldName) != null) { + dr.set(i, r.get(fieldName, dataSchema)); + } else { + dr.set(i, valuesNotInDataCols.get(fieldName)); + } + i++; + } + + currentHCatRecord = dr; + return true; } - /* (non-Javadoc) + return false; + } + + /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.RecordReader#close() */ @Override @@ -198,4 +221,64 @@ class HCatRecordReader extends RecordRea baseRecordReader.close(); } + /** + * Tracks number of of errors in input and throws a Runtime exception + * if the rate of errors crosses a limit. + *
+ * The intention is to skip over very rare file corruption or incorrect + * input, but catch programmer errors (incorrect format, or incorrect + * deserializers etc). + * + * This class was largely copied from Elephant-Bird (thanks @rangadi!) + * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java + */ + static class InputErrorTracker { + long numRecords; + long numErrors; + + double errorThreshold; // max fraction of errors allowed + long minErrors; // throw error only after this many errors + + InputErrorTracker(Configuration conf) { + errorThreshold = conf.getFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY, + HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT); + minErrors = conf.getLong(HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_KEY, + HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_DEFAULT); + numRecords = 0; + numErrors = 0; + } + + void incRecords() { + numRecords++; + } + + void incErrors(Throwable cause) { + numErrors++; + if (numErrors > numRecords) { + // incorrect use of this class + throw new RuntimeException("Forgot to invoke incRecords()?"); + } + + if (cause == null) { + cause = new Exception("Unknown error"); + } + + if (errorThreshold <= 0) { // no errors are tolerated + throw new RuntimeException("error while reading input records", cause); + } + + LOG.warn("Error while reading an input record (" + + numErrors + " out of " + numRecords + " so far ): ", cause); + + double errRate = numErrors / (double) numRecords; + + // will always excuse the first error. We can decide if single + // error crosses threshold inside close() if we want to. + if (numErrors >= minErrors && errRate > errorThreshold) { + LOG.error(numErrors + " out of " + numRecords + + " crosses configured threshold (" + errorThreshold + ")"); + throw new RuntimeException("error rate while reading input records crossed threshold", cause); + } + } + } } Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java?rev=1379476&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java (added) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatInputFormat.java Fri Aug 31 15:18:06 2012 @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.thrift.test.IntString; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class TestHCatInputFormat extends HCatBaseTest { + + private boolean setUpComplete = false; + + /** + * Create an input sequence file with 100 records; every 10th record is bad. + * Load this table into Hive. + */ + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + if (setUpComplete) { + return; + } + + Path intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq"); + LOG.info("Creating data file: " + intStringSeq); + SequenceFile.Writer seqFileWriter = SequenceFile.createWriter( + intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq, + NullWritable.class, BytesWritable.class); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + TIOStreamTransport transport = new TIOStreamTransport(out); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + for (int i = 1; i <= 100; i++) { + if (i % 10 == 0) { + seqFileWriter.append(NullWritable.get(), new BytesWritable("bad record".getBytes())); + } else { + out.reset(); + IntString intString = new IntString(i, Integer.toString(i), i); + intString.write(protocol); + BytesWritable bytesWritable = new BytesWritable(out.toByteArray()); + seqFileWriter.append(NullWritable.get(), bytesWritable); + } + } + + seqFileWriter.close(); + + // Now let's load this file into a new Hive table. + Assert.assertEquals(0, driver.run("drop table if exists test_bad_records").getResponseCode()); + Assert.assertEquals(0, driver.run( + "create table test_bad_records " + + "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " + + "with serdeproperties ( " + + " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " + + " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " + + "stored as" + + " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" + + " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'") + .getResponseCode()); + Assert.assertEquals(0, driver.run("load data local inpath '" + intStringSeq.getParent() + + "' into table test_bad_records").getResponseCode()); + + setUpComplete = true; + } + + @Test + public void testBadRecordHandlingPasses() throws Exception { + Assert.assertTrue(runJob(0.1f)); + } + + @Test + public void testBadRecordHandlingFails() throws Exception { + Assert.assertFalse(runJob(0.01f)); + } + + private boolean runJob(float badRecordThreshold) throws Exception { + Configuration conf = new Configuration(); + + conf.setFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY, badRecordThreshold); + + Job job = new Job(conf); + job.setJarByClass(this.getClass()); + job.setMapperClass(MyMapper.class); + + job.setInputFormatClass(HCatInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + + HCatInputFormat.setInput(job, InputJobInfo.create("default", "test_bad_records", null)); + + job.setMapOutputKeyClass(HCatRecord.class); + job.setMapOutputValueClass(HCatRecord.class); + + job.setNumReduceTasks(0); + + Path path = new Path(TEST_DATA_DIR, "test_bad_record_handling_output"); + if (path.getFileSystem(conf).exists(path)) { + path.getFileSystem(conf).delete(path, true); + } + + TextOutputFormat.setOutputPath(job, path); + + return job.waitForCompletion(true); + } + + public static class MyMapper extends Mapper { + @Override + public void map(NullWritable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + LOG.info("HCatRecord: " + value); + context.write(NullWritable.get(), new Text(value.toString())); + } + } +}