flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chiwan Park <chiwanp...@apache.org>
Subject Re: Discarding header from CSV file
Date Wed, 27 Apr 2016 10:18:59 GMT
Hi,

You don’t need to call execute() method after calling print() method. print() method triggers
the execution. The exception is raised because you call execute() after print() method.

Regards,
Chiwan Park

> On Apr 27, 2016, at 6:35 PM, nsengupta <sengupta.nirmalya@gmail.com> wrote:
> 
> Till,
> 
> Thanks for looking into this.
> 
> I have removed the toList() from the collect() function, to align the code
> with what I generally do in a Flink application. It throws an Exception, and
> I can't figure out why.
> 
> *Here's my code (shortened for brevity):*
> 
> case class BuildingInformation(buildingID: Int, buildingManager: Int,
> buildingAge: Int, productID: String, country: String)
> 
> object HVACReadingsAnalysis {
> 
>  def main(args: Array[String]): Unit = {
> 
>    val envDefault = ExecutionEnvironment.getExecutionEnvironment
> 
>    val buildings =
> readBuildingInfo(envDefault,"./SensorFiles/building.csv")
> 
>    buildings.print
> 
>    envDefault.execute("HVAC Simulation")
>  }
> 
>  private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String)
> = {
> 
>   // [NS]: I can see the lines, read correctly from the CSV file here
>    println("As read from CSV file")
>    println(Source.fromFile(inputPath).getLines.toList.mkString("#\n"))
> 
>    // [NS]: Then, I read the same file using the library function
>   env.readCsvFile [BuildingInformation] (
>      inputPath,
>      ignoreFirstLine = true,
>      pojoFields =
> Array("buildingID","buildingManager","buildingAge","productID","country")
>    )
>  }
> 
> 
> *Relevant portion of the output:
> *
> As read from CSV file
> BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country#
> 1,M1,25,AC1000,USA#
> 2,M2,27,FN39TG,France#
> 3,M3,28,JDNS77,Brazil#
> 4,M4,17,GG1919,Finland#
> 5,M5,3,ACMAX22,Hong Kong#
> 6,M6,9,AC1000,Singapore#
> 7,M7,13,FN39TG,South Africa#
> 8,M8,25,JDNS77,Australia#
> 9,M9,11,GG1919,Mexico#
> 10,M10,23,ACMAX22,China#
> 11,M11,14,AC1000,Belgium#
> 12,M12,26,FN39TG,Finland#
> 13,M13,25,JDNS77,Saudi Arabia#
> 14,M14,17,GG1919,Germany#
> 15,M15,19,ACMAX22,Israel#
> 16,M16,23,AC1000,Turkey#
> 17,M17,11,FN39TG,Egypt#
> 18,M18,25,JDNS77,Indonesia#
> 19,M19,14,GG1919,Canada#
> 20,M20,19,ACMAX22,Argentina
> 15:34:18,914 INFO  org.apache.flink.api.java.ExecutionEnvironment               
> - The job has 0 registered types and 0 default Kryo serializers
> 15:34:19,104 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster        
> - Starting FlinkMiniCluster.
> 15:34:19,912 INFO  akka.event.slf4j.Slf4jLogger                                 
> - Slf4jLogger started
> 
> 
> // ..
> // ... more log statements
> // ..
> 
> Exception in thread "main" java.lang.RuntimeException: No new data sinks
> have been defined since the last execution. The last execution refers to the
> latest call to 'execute()', 'count()', 'collect()', or 'print()'.
> 	at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979)
> 	at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
> 	at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84)
> 	at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
> 	at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60)
> 	at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> 
> Process finished with exit code 1
> 
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Mime
View raw message