flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Moritz Schubotz (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3568) Hadoop's Bzip decompression is not thread safe
Date Fri, 04 Mar 2016 10:42:40 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179712#comment-15179712
] 

Moritz Schubotz commented on FLINK-3568:
----------------------------------------

What exactly do you mean by
"removing the 'shaded-hadoop' and adding a newer org.apache.hadoop-dependency."


> Hadoop's Bzip decompression is not thread safe
> ----------------------------------------------
>
>                 Key: FLINK-3568
>                 URL: https://issues.apache.org/jira/browse/FLINK-3568
>             Project: Flink
>          Issue Type: Bug
>          Components: Hadoop Compatibility
>    Affects Versions: 0.10.1
>            Reporter: Sebastian Neef
>            Priority: Critical
>
> Hi,
> first of all, this is my first time filing a bug report for Apache Flink. If you need
any additional information or something else, please let me know. 
> h1. Background
> I was trying to process [Wikipedia's XML dumps|https://dumps.wikimedia.org/enwiki/20160204/]
with Apache Flink. To save disk space I decided to use the bziped versions. 
> Apache Flink is compatible to [Hadoop's InputFormats |https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html]
and Hadoop's TextInputFormat [supports compressed files|https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.html#line.5].
[Bzip2 files are splittable|http://comphadoop.weebly.com/index.html] and thus perfect for
parallel processing.
> h1. Problem
> I started to test the decompression with a simple Job based on the Apache Flink Quickstart
code:
> {code}
>     public static void main(String[] args) throws Exception {
>         if(args.length != 2) {
>             System.err.println("USAGE: Job <wikipediadump.xml.bz2> <output.txt>");
>             return;
>         }
>         // set up the execution environment
>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>         DataSet<Tuple2<LongWritable, Text>> input =
>                 env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class,
args[0]);
>         input.writeAsText(args[1]);
>         // execute program
>         env.execute("Bzip compression test");
> }
> {code}
> When starting the job, I get the following exception:
> {noformat}
> 02/29/2016 11:59:50 CHAIN DataSource (at createInput(ExecutionEnvironment.java:508) (org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat))
-> Map (Map at main(Job.java:67))(1/2) switched to FAILED 
> java.lang.ArrayIndexOutOfBoundsException: 18002
>     at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:730)
>     at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:801)
>     at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
>     at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>     at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:399)
>     at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:483)
>     at java.io.InputStream.read(InputStream.java:101)
>     at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130)
>     at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>     at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>     at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159)
>     at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
>     at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
>     at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.fetchNext(HadoopInputFormatBase.java:185)
>     at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.reachedEnd(HadoopInputFormatBase.java:179)
>     at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:166)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>     at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This does not happen with "-p 1", but a parallelism greater 1.
> h1. Research
> Googling the error message leads to some spark/hadoop mailing lists and it looks like
the used "compress.bzip2.CBZip2InputStream" class is not threadsafe:
> - [Link one|https://issues.apache.org/jira/browse/HADOOP-10614]
> - [Link two|http://stackoverflow.com/questions/5159602/processing-a-bzip-string-file-in-scala]
> - [Link three|http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-using-sc-textFile-on-BZ2-compressed-files-td22905.html]
> - [Link four|https://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3C1402318634.7682.YahooMailNeo@web190401.mail.sg3.yahoo.com%3E]
> Especially Link one is the most interesting one, because the Hadoop project resolved
the issue:
> {quote}
> Hadoop uses CBZip2InputStream to decode bzip2 files. However, the implementation is not
threadsafe. This is not a really problem for Hadoop MapReduce because Hadoop runs each task
in a separate JVM. But for other libraries that utilize multithreading and use Hadoop's InputFormat,
e.g., Spark, it will cause exceptions like the following:
> {quote}
> My guess is that Apache Flink needs to update/patch the CBZip2InputStream class to resolve
the problem? 
> All the best,
> Sebastian



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message