flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sebastian <ssc.o...@googlemail.com>
Subject Re: read .gz files
Date Thu, 19 Feb 2015 20:56:19 GMT
I tried to follow the example on the web page like this:

-----------------------------------------------------------------------

implicit val env = ExecutionEnvironment.getExecutionEnvironment

val job = Job.getInstance

val hadoopInput = new HadoopInputFormat[LongWritable,Text](
   new TextInputFormat, classOf[LongWritable], classOf[Text], job)

FileInputFormat.addInputPath(job, new Path("/home/ssc/pld-index.gz"))

val lines: DataSet[Tuple2[LongWritable, Text]] =
     env.createInput(hadoopInput)

val numLines = lines.map { _ => Tuple1(1) }
                       .sum(0)

numLines.printToErr()

env.execute()

-----------------------------------------------------------------------

Unfortunately, I get the following exception, which I cannot resolve:

Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs 
to be parameterized by using generics.
	at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:258)
	at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:201)
	at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:188)
	at 
io.ssc.trackthetrackers.analysis.statistics.Playing$delayedInit$body.apply(Playing.scala:24)
	at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
	at scala.App$$anonfun$main$1.apply(App.scala:71)
	at scala.App$$anonfun$main$1.apply(App.scala:71)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
	at scala.App$class.main(App.scala:71)
	at 
io.ssc.trackthetrackers.analysis.statistics.Playing$.main(Playing.scala:15)
	at io.ssc.trackthetrackers.analysis.statistics.Playing.main(Playing.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

Any tips on how to proceed?

Best,
Sebastian







On 19.02.2015 21:36, Robert Metzger wrote:
> I just had a look at Hadoop's TextInputFormat.
> In hadoop-common-2.2.0.jar there are the following compression codecs
> contained:
>
> org.apache.hadoop.io.compress.BZip2Codec
> org.apache.hadoop.io.compress.DefaultCodec
> org.apache.hadoop.io.compress.DeflateCodec
> org.apache.hadoop.io.compress.GzipCodec
> org.apache.hadoop.io.compress.Lz4Codec
> org.apache.hadoop.io.compress.SnappyCodec
>
> (See also CompressionCodecFactory). So you should be good to go.
>
>
> On Thu, Feb 19, 2015 at 9:31 PM, Robert Metzger <rmetzger@apache.org
> <mailto:rmetzger@apache.org>> wrote:
>
>     Hi,
>
>     right now Flink itself has only support for reading ".deflate"
>     files. Its basically the same algorithm as gzip but gzip files seem
>     to have some header which makes the two formats incompatible.
>
>     But you can easily use HadoopInputFormats with Flink. I'm sure there
>     is a Hadoop IF for reading gzip'ed files.
>
>
>     Best,
>     Robert
>
>
>     On Thu, Feb 19, 2015 at 9:25 PM, Sebastian <ssc.open@googlemail.com
>     <mailto:ssc.open@googlemail.com>> wrote:
>
>         Hi,
>
>         does flink support reading gzipped files? Haven't found any info
>         about this on the website.
>
>         Best,
>         Sebastian
>
>
>

Mime
View raw message