flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nirmalya Sengupta <sengupta.nirma...@gmail.com>
Subject Need some help to understand the cause of the error
Date Fri, 26 Feb 2016 01:44:15 GMT
Hello Flinksters,

I am trying to use Flinkspector in a Scala code snippet of mine and Flink
is complaining. The code is here:

---------------------------------------------------------------------------------------------------------------

case class Reading(field1:String,field2:String,field3:Int)

object MultiWindowing {

  def main(args: Array[String]) {}

  //  WindowFunction<IN,OUT,KEY,W extends Window>

  class WindowPrinter extends WindowFunction[Reading, String, String,
TimeWindow] {

      //  .....
    }
  }

  val env = DataStreamTestEnvironment.createTestEnvironment(1)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val input: EventTimeInput[Reading]  =
    EventTimeInputBuilder
    .startWith(Reading("hans", "elephant", 15))
    .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
    .emit(Reading("pete", "elephant", 40), After.period(20,
TimeUnit.SECONDS))

  //acquire data source from input
  val stream = env.fromInput(input)

  //apply transformation
  val k = stream.keyBy(new KeySelector [Reading,String] {
    def getKey(r:Reading) =  r.field2
  })
    .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))

    k.sum(3)
    .print()

  env.execute()

}

---------------------------------------------------------------------------------------------------------------

And at runtime, I get this error:

----------------------------------------------------------------------------------------------------------------

Exception in thread "main" java.lang.ExceptionInInitializerError
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for
a simple type (non-tuple, non-array).
at
org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
at
org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37)
at
org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala)
... 6 more


---------------------------------------------------------------------------------------------------------------

Can someone help me by pointing out the mistake I am making?

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Mime
View raw message