flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Need some help to understand the cause of the error
Date Fri, 26 Feb 2016 09:00:46 GMT
Hi,
as far as I can see it the problem is in this line:
k.sum(3)

using field indices is only valid for Tuple Types. In your case you should be able to use
this:
k.sum(“field3”)

because this is a field of your Reading type.

Cheers,
Aljoscha
> On 26 Feb 2016, at 02:44, Nirmalya Sengupta <sengupta.nirmalya@gmail.com> wrote:
> 
> Hello Flinksters,
> 
> I am trying to use Flinkspector in a Scala code snippet of mine and Flink is complaining.
The code is here:
> 
> ---------------------------------------------------------------------------------------------------------------
> 
> case class Reading(field1:String,field2:String,field3:Int)
> 
> object MultiWindowing {
> 
>   def main(args: Array[String]) {}
> 
>   //  WindowFunction<IN,OUT,KEY,W extends Window>
> 
>   class WindowPrinter extends WindowFunction[Reading, String, String, TimeWindow] {
> 
>       //  .....
>     }
>   }
> 
>   val env = DataStreamTestEnvironment.createTestEnvironment(1)
> 
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
>   val input: EventTimeInput[Reading]  =
>     EventTimeInputBuilder
>     .startWith(Reading("hans", "elephant", 15))
>     .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
>     .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS))
> 
>   //acquire data source from input
>   val stream = env.fromInput(input)
> 
>   //apply transformation
>   val k = stream.keyBy(new KeySelector [Reading,String] {
>     def getKey(r:Reading) =  r.field2
>   })
>     .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))
> 
>     k.sum(3)
>     .print()
> 
>   env.execute()
> 
> }
> 
> ---------------------------------------------------------------------------------------------------------------
> 
> And at runtime, I get this error:
> 
> ----------------------------------------------------------------------------------------------------------------
> 
> Exception in thread "main" java.lang.ExceptionInInitializerError
> 	at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type
(non-tuple, non-array).
> 	at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
> 	at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37)
> 	at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
> 	at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63)
> 	at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala)
> 	... 6 more
> 
> 
> ---------------------------------------------------------------------------------------------------------------
> 
> Can someone help me by pointing out the mistake I am making?
> 
> -- Nirmalya
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is where they
should be.
> Now put the foundation under them."


Mime
View raw message