flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anirvan BASU <anirvan.b...@inria.fr>
Subject Re: Program crashes trying to read JSON file
Date Tue, 25 Nov 2014 21:04:28 GMT
Thanks to Aljoscha and Stefano for pointing out the flaw. 

We corrected the issue as follows: 

[CODE] 

import org.apache.flink.api.java.tuple. Tuple4 ; 
import org.apache.flink.util.Collector; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; 
import org.apache.sling.commons.json.JSONException; 
... 

public static void main(String[] args) throws Exception { 

if(!parseParameters(args)) { 
return; 
} 

// set up the execution environment 
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// get input data 
DataSet<String> text = getTextDataSet(env); 

DataSet<Tuple4<Integer, String, String, Integer >> counts = 
// split up the lines in pairs (4-tuples) containing: (timestamp,uuid,event, count ) 
text.flatMap(new SelectDataFlatMap()) 
// group by the tuple field "1" (an event - string) and sum up tuple field "3" (integer -
value 1) 
. groupBy(1) 
. sum(3 ); 

// emit result 
if(fileOutput) { 
counts.writeAsCsv(outputPath, "\n", " "); 
} else { 
counts.print(); 
} 

// execute program 
env.execute("Weblogs Programme"); 
} 

... 

public static class SelectDataFlatMap extends 
JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> { 

private static final long serialVersionUID = 1L; 

@Override 
public void flatMap(String value, Collector<Tuple4<Integer, String, String, Integer>>
record) 
throws Exception { 
try { 
record.collect(new Tuple4<Integer, String, String, Integer>( 
getInt(value, "timestamp"), 
getString(value, "uuid"), 
getString(value, "event"), 
1)); 
} catch (JSONException e) { 
System.err.println("Field not found"); 
} 
} 
} 

[/CODE] 

However, this time the issue was different. 
The programme executed correctly till status FINISHED. 
However, there was no output :-(( 
i.e. For each Task Manager, an empty file is written. 

When we checked further about the input text file that is read using env.readTextFile() we
find that instead of a text string (full text dataset) only a small string is written! 
Something as : 
org.apache.flink.api.java.operators.DataSource@6bd8b476 

Worse still ! this string value sometimes remains the same over multiple runs of the programme
.... 
Is this natural ? Is this just the handle to the file or the dataset ? 
Is the Collector() working correctly also ? 

Note : 
The actual JSON file (i.e. the text file that should be read) is of the following nature,
with a 2-level hierarchy for one field: 
[JSON] 
{timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event:
General,Login,Success}} 
{timestamp: 1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event:
General,App,Opened}} 
[/JSON] 

So now again, we are confused if we are doing it correctly :-(( 

Thanks in advance for helping us to understand where we are going wrong. 
Anirvan 

----- Original Message -----

> From: "Stefano Bortoli" <s.bortoli@gmail.com>
> To: "user" <user@flink.incubator.apache.org>
> Cc: dev@flink.incubator.apache.org
> Sent: Tuesday, November 25, 2014 5:05:34 PM
> Subject: Re: Program crashes trying to read JSON file

> Very quickly, it seems you are trying to sum on Strings

> Caused by: org.apache.flink.api.java.
> aggregation.UnsupportedAggregationTypeException: The type java.lang.String
> has currently not supported for built-in sum aggregations.

> Check your tuple types and be sure that you are not summing on strings.

> 2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > :

> > Hello all,
> 

> > We are using Flink 0.7 and trying to read a large JSON file, reading some
> > fields into a flink (3-tuple based) dataset, then performing some
> > operations.
> 

> > We encountered the following runtime error:
> 

> > [QUOTE]
> 
> > Error: The main method caused an error.
> 
> > org.apache.flink.client.program.ProgramInvocationException: The main method
> > caused an error.
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> 
> > at org.apache.flink.client.program.Client.run(Client.java:244)
> 
> > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> 
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> 
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> 
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> 
> > Caused by:
> > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > The type java.lang.String has currently not supported for built-in sum
> > aggregations.
> 
> > at
> > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> 
> > at
> > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> 
> > at
> > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> 
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
> > at java.lang.reflect.Method.invoke(Method.java:606)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> 
> > ... 6 more
> 
> > [/QUOTE]
> 

> > The code snippet that could have caused this error (i.e. that we edited) is
> > the following
> 

> > [CODE]
> 

> > import org.apache.flink.api.java.tuple.Tuple3;
> 
> > import org.apache.flink.util.Collector;
> 
> > import org.apache.flink.api.java.DataSet;
> 
> > import org.apache.flink.api.java.ExecutionEnvironment;
> 
> > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> 
> > import org.apache.sling.commons.json.JSONException;
> 
> > ...
> 

> > public static void main(String[] args) throws Exception {
> 

> > if(!parseParameters(args)) {
> 
> > return;
> 
> > }
> 

> > // set up the execution environment
> 
> > final ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> 

> > // get input data
> 
> > DataSet<String> text = getTextDataSet(env);
> 

> > DataSet<Tuple3<Integer, String, String>> counts =
> 
> > // split up the lines in pairs (3-tuples) containing:
> > (timestamp,uuid,event)
> 
> > text.flatMap(new SelectDataFlatMap() )
> 
> > // group by the tuple field "0" and sum up tuple field "1"
> 
> > .groupBy(2)
> 
> > .sum(2);
> 

> > // emit result
> 
> > if(fileOutput) {
> 
> > counts.writeAsCsv(outputPath, "\n", " ");
> 
> > } else {
> 
> > counts.print();
> 
> > }
> 

> > // execute program
> 
> > env.execute("Weblogs Programme");
> 
> > }
> 

> > ...
> 

> > public static class SelectDataFlatMap extends
> 
> > JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
> 

> > @Override
> 
> > public void flatMap(String value, Collector<Tuple3<Integer, String,
> > String>>
> > out)
> 
> > throws Exception {
> 
> > try {
> 
> > out.collect(new Tuple3<Integer, String, String>(
> 
> > getInt(value, "timestamp"),
> 
> > getString(value, "uuid"),
> 
> > getString(value, "event")));
> 
> > } catch (JSONException e) {
> 
> > System.err.println("Field not found");
> 
> > }
> 
> > }
> 
> > }
> 

> > [/CODE]
> 

> > [QUOTE]
> 
> > Error: The main method caused an error.
> 
> > org.apache.flink.client.program.ProgramInvocationException: The main method
> > caused an error.
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> 
> > at org.apache.flink.client.program.Client.run(Client.java:244)
> 
> > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> 
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> 
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> 
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> 
> > Caused by:
> > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > The type java.lang.String has currently not supported for built-in sum
> > aggregations.
> 
> > at
> > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> 
> > at
> > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> 
> > at
> > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> 
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
> > at java.lang.reflect.Method.invoke(Method.java:606)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> 
> > ... 6 more
> 
> > [/QUOTE]
> 

> > The JSON file is of the following nature, with a 2-level hierarchy for one
> > field:
> 
> > [JSON]
> 
> > {timestamp: 1397731764 payload: {product: Younited uuid:
> > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform:
> > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1
> > event: General,Login,Success}}
> 
> > {timestamp: 1397731765 payload: {product: Younited uuid:
> > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform:
> > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1
> > event: General,App,Opened}}
> 
> > [/JSON]
> 

> > Thanks in advance for helping us to understand where we are going wrong.
> 

> > Anirvan
> 

Mime
View raw message