crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stan Rosenberg (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (CRUNCH-597) Unable to process parquet files using Hadoop
Date Fri, 18 Mar 2016 16:47:33 GMT

    [ https://issues.apache.org/jira/browse/CRUNCH-597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201762#comment-15201762
] 

Stan Rosenberg edited comment on CRUNCH-597 at 3/18/16 4:46 PM:
----------------------------------------------------------------

The code executes without error after upgrading crunch to use parquet 1.8.1.  (See https://github.com/srosenberg/crunch/commit/d78e1793d0313b073264aa607bfebd610bd8c0dd)

Upon some investigation, it appears that the bug was fixed in a later version of parquet-hadoop.
 The version used by crunch expects the input split to be of type ParquetInputSplit in ParquetRecordReader.initialize,

{code}
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
InterruptedException {
        if(context instanceof TaskInputOutputContext) {
            BenchmarkCounter.initCounterFromContext((TaskInputOutputContext)context);
        } else {
            LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext,
but is " + context.getClass().getCanonicalName());
        }
        this.initializeInternalReader((ParquetInputSplit)inputSplit, ContextUtil.getConfiguration(context));
}
{code}

whereas the latest implementation converts between FileSplit and ParquetInputSplit,

{code}
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
InterruptedException {
   if (context instanceof TaskInputOutputContext<?, ?, ?, ?>) {
       BenchmarkCounter.initCounterFromContext((TaskInputOutputContext<?, ?, ?, ?>)
context);
   } else {
       LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext,
but is "
              + context.getClass().getCanonicalName());
   }
  initializeInternalReader(toParquetSplit(inputSplit), ContextUtil.getConfiguration(context));
}
{code}


was (Author: srosenberg):
The code executes without error after upgrading crunch to use parquet 1.8.1.  (See https://github.com/srosenberg/crunch/commit/d78e1793d0313b073264aa607bfebd610bd8c0dd)

Upon some investigation, it appears that the bug was fixed in a later version of parquet-hadoop.
 The version used by crunch expects the input split to be of type ParquetInputSplit in ParquetRecordReader.initialize,

{code}
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
InterruptedException {
        if(context instanceof TaskInputOutputContext) {
            BenchmarkCounter.initCounterFromContext((TaskInputOutputContext)context);
        } else {
            LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext,
but is " + context.getClass().getCanonicalName());
        }
        this.initializeInternalReader((ParquetInputSplit)inputSplit, ContextUtil.getConfiguration(context));
}
{code}

whereas the latest implementation converts between FileSplit and ParquetInputSplit,

{code}
public void initialize(InputSplit inputSplit, Configuration configuration, Reporter reporter)
      throws IOException, InterruptedException {
    BenchmarkCounter.initCounterFromReporter(reporter,configuration);
    initializeInternalReader(toParquetSplit(inputSplit), configuration);
 }
{code}

> Unable to process parquet files using Hadoop
> --------------------------------------------
>
>                 Key: CRUNCH-597
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-597
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core, IO
>    Affects Versions: 0.13.0
>            Reporter: Stan Rosenberg
>            Assignee: Josh Wills
>
> Current version of parquet-hadoop results in the following stack trace while attempting
to read from parquet file.
> {code}
> java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit
cannot be cast to parquet.hadoop.ParquetInputSplit
> 	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
> Caused by: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit
cannot be cast to parquet.hadoop.ParquetInputSplit
> 	at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:107)
> 	at org.apache.crunch.impl.mr.run.CrunchRecordReader.initialize(CrunchRecordReader.java:140)
> 	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:478)
> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:671)
> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> 	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> Here is the relevant code snippet which yields the above stack trace when executed locally,
> {code}
> Pipeline pipeline = new MRPipeline(Crunch.class, conf);
> PCollection<Pair<String, Observation>> observations = 
>              pipeline.read(AvroParquetFileSource.builder(record).build(new Path(args[0])))
>                          .parallelDo(new TranslateFn(), Avros.tableOf(Avros.strings(),
Avros.specifics(Observation.class)));
> for (Pair<String, Observation> pair : observations.materialize()) {
>       System.out.println(pair.second());
> }
> PipelineResult result = pipeline.done();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message