flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anirvan BASU <anirvan.b...@inria.fr>
Subject Program crashes trying to read JSON file
Date Tue, 25 Nov 2014 15:55:17 GMT
Hello all, 

We are using Flink 0.7 and trying to read a large JSON file, reading some fields into a flink
(3-tuple based) dataset, then performing some operations. 

We encountered the following runtime error: 

[QUOTE] 
Error: The main method caused an error. 
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)

at org.apache.flink.client.program.Client.run(Client.java:244) 
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) 
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) 
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) 
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) 
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The
type java.lang.String has currently not supported for built-in sum aggregations. 
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)

at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)

at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)

at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) 
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)

... 6 more 
[/QUOTE] 



The code snippet that could have caused this error (i.e. that we edited) is the following


[CODE] 

import org.apache.flink.api.java.tuple.Tuple3; 
import org.apache.flink.util.Collector; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; 
import org.apache.sling.commons.json.JSONException; 
... 

public static void main(String[] args) throws Exception { 

if(!parseParameters(args)) { 
return; 
} 

// set up the execution environment 
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// get input data 
DataSet<String> text = getTextDataSet(env); 

DataSet<Tuple3<Integer, String, String>> counts = 
// split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event) 
text.flatMap(new SelectDataFlatMap() ) 
// group by the tuple field "0" and sum up tuple field "1" 
.groupBy(2) 
.sum(2); 

// emit result 
if(fileOutput) { 
counts.writeAsCsv(outputPath, "\n", " "); 
} else { 
counts.print(); 
} 

// execute program 
env.execute("Weblogs Programme"); 
} 

... 

public static class SelectDataFlatMap extends 
JSONParseFlatMap<String, Tuple3<Integer, String, String>> { 

@Override 
public void flatMap(String value, Collector<Tuple3<Integer, String, String>> out)

throws Exception { 
try { 
out.collect(new Tuple3<Integer, String, String>( 
getInt(value, "timestamp"), 
getString(value, "uuid"), 
getString(value, "event"))); 
} catch (JSONException e) { 
System.err.println("Field not found"); 
} 
} 
} 

[/CODE] 



[QUOTE] 
Error: The main method caused an error. 
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)

at org.apache.flink.client.program.Client.run(Client.java:244) 
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) 
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) 
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) 
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) 
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The
type java.lang.String has currently not supported for built-in sum aggregations. 
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)

at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)

at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)

at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) 
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)

... 6 more 
[/QUOTE] 


The JSON file is of the following nature, with a 2-level hierarchy for one field: 
[JSON] 
{timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event:
General,Login,Success}} 
{timestamp: 1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event:
General,App,Opened}} 
[/JSON] 



Thanks in advance for helping us to understand where we are going wrong. 

Anirvan 

Mime
View raw message