From hcatalog-commits-return-883-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Wed May 16 04:10:35 2012 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 82A60C1A7 for ; Wed, 16 May 2012 04:10:35 +0000 (UTC) Received: (qmail 85780 invoked by uid 500); 16 May 2012 04:10:35 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 85737 invoked by uid 500); 16 May 2012 04:10:35 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 85727 invoked by uid 99); 16 May 2012 04:10:35 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 May 2012 04:10:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 May 2012 04:10:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E58B7238890B; Wed, 16 May 2012 04:10:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1338997 - in /incubator/hcatalog/branches/branch-0.4: ./ src/java/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/mapreduce/ Date: Wed, 16 May 2012 04:10:08 -0000 To: hcatalog-commits@incubator.apache.org From: toffer@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120516041008.E58B7238890B@eris.apache.org> Author: toffer Date: Wed May 16 04:10:08 2012 New Revision: 1338997 URL: http://svn.apache.org/viewvc?rev=1338997&view=rev Log: merged from trunk: HCATALOG-373 ProgressReporter should work with both old and new MR API (traviscrawford via toffer) Modified: incubator/hcatalog/branches/branch-0.4/ (props changed) incubator/hcatalog/branches/branch-0.4/CHANGES.txt incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Propchange: incubator/hcatalog/branches/branch-0.4/ ------------------------------------------------------------------------------ Merged /incubator/hcatalog/trunk:r1338996 Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1338997&r1=1338996&r2=1338997&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original) +++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Wed May 16 04:10:08 2012 @@ -89,6 +89,8 @@ Release 0.4.0 - Unreleased HCAT-2 Support nested schema conversion between Hive an Pig (julienledem via hashutosh) IMPROVEMENTS + HCAT-373 ProgressReporter should work with both old and new MR API (traviscrawford via toffer) + HCAT-68 Logging from HCat (avandana via toffer) HCAT-383 Add clover to build.xml (gates) Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java?rev=1338997&r1=1338996&r2=1338997&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java Wed May 16 04:10:08 2012 @@ -25,15 +25,12 @@ import java.util.Map; import java.util.HashMap; import java.util.List; -import org.apache.hadoop.hive.serde2.SerDe; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -83,7 +80,7 @@ public abstract class HCatBaseInputForma HCatUtil.serialize(hcatSchema)); } - private static + protected static org.apache.hadoop.mapred.InputFormat getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException { return ( @@ -178,7 +175,7 @@ public abstract class HCatBaseInputForma createRecordReader(InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException { - HCatSplit hcatSplit = (HCatSplit) split; + HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); PartInfo partitionInfo = hcatSplit.getPartitionInfo(); JobContext jobContext = taskContext; @@ -186,46 +183,17 @@ public abstract class HCatBaseInputForma jobContext.getConfiguration(), partitionInfo); JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext); - - Class inputFormatClass = storageHandler.getInputFormatClass(); - org.apache.hadoop.mapred.InputFormat inputFormat = - getMapRedInputFormat(jobConf, inputFormatClass); - Map jobProperties = partitionInfo.getJobProperties(); HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); - Reporter reporter = InternalUtil.createReporter(taskContext); - org.apache.hadoop.mapred.RecordReader recordReader = - inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, reporter); - - SerDe serde; - try { - serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), - jobContext.getConfiguration()); - -// HCatUtil.logEntrySet(LOG, "props to serde", properties.entrySet()); - - Configuration conf = storageHandler.getConf(); - InternalUtil.initializeInputSerDe(serde, conf, - partitionInfo.getTableInfo(),partitionInfo.getPartitionSchema()); - - } catch (Exception e) { - throw new IOException("Unable to create objectInspector " - + "for serde class " + storageHandler.getSerDeClass().getName() - + e); - } Map valuesNotInDataCols = getColValsNotInDataColumns( getOutputSchema(jobContext),partitionInfo ); - HCatRecordReader hcatRecordReader = new HCatRecordReader(storageHandler, - recordReader, - serde, - valuesNotInDataCols); - return hcatRecordReader; + return new HCatRecordReader(storageHandler, valuesNotInDataCols); } - + /** * gets values for fields requested by output schema which will not be in the data */ Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1338997&r1=1338996&r2=1338997&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Wed May 16 04:10:08 2012 @@ -19,13 +19,15 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; import java.util.Map; -import java.util.Properties; import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; @@ -47,9 +49,7 @@ class HCatRecordReader extends RecordRea Writable currentValue; /** The underlying record reader to delegate to. */ - //org.apache.hadoop.mapred. - private final org.apache.hadoop.mapred.RecordReader - baseRecordReader; + private org.apache.hadoop.mapred.RecordReader baseRecordReader; /** The storage handler used */ private final HCatStorageHandler storageHandler; @@ -63,16 +63,10 @@ class HCatRecordReader extends RecordRea /** * Instantiates a new hcat record reader. - * @param baseRecordReader the base record reader */ public HCatRecordReader(HCatStorageHandler storageHandler, - org.apache.hadoop.mapred.RecordReader baseRecordReader, - SerDe serde, Map valuesNotInDataCols) { - this.baseRecordReader = baseRecordReader; this.storageHandler = storageHandler; - this.serde = serde; this.valuesNotInDataCols = valuesNotInDataCols; } @@ -83,37 +77,56 @@ class HCatRecordReader extends RecordRea */ @Override public void initialize(org.apache.hadoop.mapreduce.InputSplit split, - TaskAttemptContext taskContext) - throws IOException, InterruptedException { - org.apache.hadoop.mapred.InputSplit baseSplit; + TaskAttemptContext taskContext) throws IOException, InterruptedException { + + HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); + + baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext); + serde = createSerDe(hcatSplit, storageHandler, taskContext); - // Pull the output schema out of the TaskAttemptContext - outputSchema = (HCatSchema)HCatUtil.deserialize( + // Pull the output schema out of the TaskAttemptContext + outputSchema = (HCatSchema) HCatUtil.deserialize( taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA)); - if( split instanceof HCatSplit ) { - baseSplit = ((HCatSplit) split).getBaseSplit(); - } else { - throw new IOException("Not a HCatSplit"); - } + if (outputSchema == null) { + outputSchema = hcatSplit.getTableSchema(); + } - if (outputSchema == null){ - outputSchema = ((HCatSplit) split).getTableSchema(); - } + // Pull the table schema out of the Split info + // TODO This should be passed in the TaskAttemptContext instead + dataSchema = hcatSplit.getDataSchema(); + } - // Pull the table schema out of the Split info - // TODO This should be passed in the TaskAttemptContext instead - dataSchema = ((HCatSplit)split).getDataSchema(); - - Properties properties = new Properties(); - for (Map.Entryparam : - ((HCatSplit)split).getPartitionInfo() - .getJobProperties().entrySet()) { - properties.setProperty(param.getKey(), param.getValue()); - } + private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit, + HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException { + + JobConf jobConf = HCatUtil.getJobConfFromContext(taskContext); + HCatUtil.copyJobPropertiesToJobConf(hcatSplit.getPartitionInfo().getJobProperties(), jobConf); + org.apache.hadoop.mapred.InputFormat inputFormat = + HCatInputFormat.getMapRedInputFormat(jobConf, storageHandler.getInputFormatClass()); + return inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, + InternalUtil.createReporter(taskContext)); } - /* (non-Javadoc) + private SerDe createSerDe(HCatSplit hcatSplit, HCatStorageHandler storageHandler, + TaskAttemptContext taskContext) throws IOException { + + SerDe serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), + taskContext.getConfiguration()); + + try { + InternalUtil.initializeInputSerDe(serde, storageHandler.getConf(), + hcatSplit.getPartitionInfo().getTableInfo(), + hcatSplit.getPartitionInfo().getPartitionSchema()); + } catch (SerDeException e) { + throw new IOException("Failed initializing SerDe " + + storageHandler.getSerDeClass().getName(), e); + } + + return serde; + } + + /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey() */ @Override Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1338997&r1=1338996&r2=1338997&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Wed May 16 04:10:08 2012 @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatFieldSchema; @@ -168,4 +169,18 @@ static Reporter createReporter(TaskAttem return new ProgressReporter(context); } + /** + * Casts an InputSplit into a HCatSplit, providing a useful error message if the cast fails. + * @param split the InputSplit + * @return the HCatSplit + * @throws IOException + */ + public static HCatSplit castToHCatSplit(InputSplit split) throws IOException { + if (split instanceof HCatSplit) { + return (HCatSplit) split; + } else { + throw new IOException("Split must be " + HCatSplit.class.getName() + + " but found " + split.getClass().getName()); + } + } } Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java?rev=1338997&r1=1338996&r2=1338997&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java Wed May 16 04:10:08 2012 @@ -21,53 +21,65 @@ package org.apache.hcatalog.mapreduce; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; -import org.apache.hadoop.util.Progressable; -class ProgressReporter implements Reporter { +class ProgressReporter extends StatusReporter implements Reporter { - private Progressable progressable; + private TaskInputOutputContext context = null; + private TaskAttemptContext taskAttemptContext = null; - public ProgressReporter(TaskAttemptContext context) { - this(context instanceof TaskInputOutputContext? - (TaskInputOutputContext)context: - Reporter.NULL); - } - - public ProgressReporter(Progressable progressable) { - this.progressable = progressable; - } - - @Override - public void setStatus(String status) { - } - - @Override - public Counters.Counter getCounter(Enum name) { - return Reporter.NULL.getCounter(name); - } - - @Override - public Counters.Counter getCounter(String group, String name) { - return Reporter.NULL.getCounter(group,name); - } - - @Override - public void incrCounter(Enum key, long amount) { - } - - @Override - public void incrCounter(String group, String counter, long amount) { - } - - @Override - public InputSplit getInputSplit() throws UnsupportedOperationException { - return Reporter.NULL.getInputSplit(); - } - - @Override - public void progress() { - progressable.progress(); + public ProgressReporter(TaskAttemptContext context) { + if (context instanceof TaskInputOutputContext) { + this.context = (TaskInputOutputContext) context; + } else { + taskAttemptContext = context; + } + } + + @Override + public void setStatus(String status) { + if (context != null) { + context.setStatus(status); + } + } + + @Override + public Counters.Counter getCounter(Enum name) { + return (context != null) ? (Counters.Counter) context.getCounter(name) : null; + } + + @Override + public Counters.Counter getCounter(String group, String name) { + return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null; + } + + @Override + public void incrCounter(Enum key, long amount) { + if (context != null) { + context.getCounter(key).increment(amount); + } + } + + @Override + public void incrCounter(String group, String counter, long amount) { + if (context != null) { + context.getCounter(group, counter).increment(amount); + } + } + + @Override + public InputSplit getInputSplit() throws UnsupportedOperationException { + return null; + } + + @Override + public void progress() { + if (context != null) { + context.progress(); + } else { + taskAttemptContext.progress(); } + } } Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1338997&r1=1338996&r2=1338997&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Wed May 16 04:10:08 2012 @@ -49,6 +49,7 @@ import org.apache.hadoop.io.BytesWritabl import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.Mapper; @@ -267,6 +268,13 @@ public abstract class HCatMapReduceTest HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns)); boolean success = job.waitForCompletion(true); + + // Ensure counters are set when data has actually been read. + if (partitionValues != null) { + assertTrue(job.getCounters().getGroup("FileSystemCounters") + .findCounter("FILE_BYTES_READ").getValue() > 0); + } + if (success) { new FileOutputCommitterContainer(job,null).commitJob(job); } else {