Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EAA1B10E84 for ; Wed, 26 Nov 2014 07:39:30 +0000 (UTC) Received: (qmail 53926 invoked by uid 500); 26 Nov 2014 07:39:30 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 53861 invoked by uid 500); 26 Nov 2014 07:39:30 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 53851 invoked by uid 99); 26 Nov 2014 07:39:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Nov 2014 07:39:30 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of s.bortoli@gmail.com designates 209.85.212.178 as permitted sender) Received: from [209.85.212.178] (HELO mail-wi0-f178.google.com) (209.85.212.178) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Nov 2014 07:39:25 +0000 Received: by mail-wi0-f178.google.com with SMTP id hi2so3924673wib.17 for ; Tue, 25 Nov 2014 23:38:19 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=x2WJLnql9LFE95eXqtgYkZ92wvRpheCscBZ1+S9mKPc=; b=Y/vhyVVH4TImdIbsNRlnnZ7nQn9bohTjnHawuKaSzUyBPqVLEeUpkG3/ANobQHM/DZ YuxDaS78W21Ya/NrjxypJ4rz6rsdxXlTS/NLDPVvCfiM2oRGg0T0nt60UOx7TPirWmYb fprIP+vm1tYY1q20iKEvWAGdaaZNUZaxwUcWcmtQI/0fX1RxPjlC5crnvUZMj13xQ6M/ AezjCWhISJnMbO/s+HqfFd9xIBs9BXjJJLpA1S3L3HgkVxVoVXjpdUTqShjxM7GRFdlI vj8E/GdLUP2pcLVmQWG41bXMzv9K/oMSdRKzjESMBcqBaEEJ1bXb7wEr0wrYI821jOru QhFQ== X-Received: by 10.194.209.180 with SMTP id mn20mr46135556wjc.49.1416987499701; Tue, 25 Nov 2014 23:38:19 -0800 (PST) MIME-Version: 1.0 Received: by 10.27.138.197 with HTTP; Tue, 25 Nov 2014 23:37:59 -0800 (PST) In-Reply-To: <221719822.20001320.1416949468247.JavaMail.zimbra@inria.fr> References: <1416937452.77343.ezmlm@flink.incubator.apache.org> <221719822.20001320.1416949468247.JavaMail.zimbra@inria.fr> From: Stefano Bortoli Date: Wed, 26 Nov 2014 08:37:59 +0100 Message-ID: Subject: Re: Program crashes trying to read JSON file To: user Content-Type: multipart/alternative; boundary=047d7b3a833c922c5e0508be1c66 X-Virus-Checked: Checked by ClamAV on apache.org --047d7b3a833c922c5e0508be1c66 Content-Type: text/plain; charset=UTF-8 You can output your results in different ways. If all you need is to write a file, I normally use the writeAsText method (however, there is the writeAsCSV, writeAsFormattedText. Of write according to your custom FileOutputFormat. datasetToPrint.writeAsText("/path/to/file/with/permission", WriteMode.OVERWRITE); Keep in mind that this will output your tuple dataset. Therefore, if you want to shape your output differently, It may be necessary to have further processing. saluti, Stefano 2014-11-25 22:04 GMT+01:00 Anirvan BASU : > Thanks to Aljoscha and Stefano for pointing out the flaw. > > We corrected the issue as follows: > > [CODE] > > import org.apache.flink.api.java.tuple.*Tuple4*; > import org.apache.flink.util.Collector; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; > import org.apache.sling.commons.json.JSONException; > ... > > public static void main(String[] args) throws Exception { > > if(!parseParameters(args)) { > return; > } > > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > // get input data > DataSet text = getTextDataSet(env); > > DataSet> counts = > // split up the lines in pairs (4-tuples) containing: > (timestamp,uuid,event,*count*) > text.flatMap(new SelectDataFlatMap()) > // group by the tuple field "1" (an event - string) and > sum up tuple field "3" (integer - value 1) > .*groupBy(1)* > .*sum(3*); > > > // emit result > if(fileOutput) { > counts.writeAsCsv(outputPath, "\n", " "); > } else { > counts.print(); > } > > // execute program > env.execute("Weblogs Programme"); > } > > ... > > public static class SelectDataFlatMap extends > JSONParseFlatMap> { > > private static final long serialVersionUID = 1L; > > @Override > public void flatMap(String value, Collector String, String, Integer>> record) > throws Exception { > try { > record.collect(new Tuple4 Integer>( > getInt(value, "timestamp"), > getString(value, "uuid"), > getString(value, "event"), > 1)); > } catch (JSONException e) { > System.err.println("Field not found"); > } > } > } > > > [/CODE] > > However, this time the issue was different. > The programme executed correctly till status FINISHED. > However, there was no output :-(( > i.e. For each Task Manager, an empty file is written. > > When we checked further about the input text file that is read using > env.readTextFile() we find that instead of a text string (full text > dataset) only a small string is written! > Something as : > org.apache.flink.api.java.operators.DataSource@6bd8b476 > > Worse still ! this string value sometimes remains the same over multiple > runs of the programme .... > Is this natural ? Is this just the handle to the file or the dataset ? > Is the Collector() working correctly also ? > > > Note : > The actual JSON file (i.e. the text file that should be read) is of the > following nature, with a 2-level hierarchy for one field: > [JSON] > {timestamp: 1397731764 payload: {product: > Younited uuid: > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd > platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 > type: can-usage-v1 event: General,Login,Success}} > {timestamp: 1397731765 payload: {product: > Younited uuid: > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e > platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e > type: can-usage-v1 event: General,App,Opened}} > [/JSON] > > > So now again, we are confused if we are doing it correctly :-(( > > Thanks in advance for helping us to understand where we are going wrong. > Anirvan > > ------------------------------ > > *From: *"Stefano Bortoli" > *To: *"user" > *Cc: *dev@flink.incubator.apache.org > *Sent: *Tuesday, November 25, 2014 5:05:34 PM > *Subject: *Re: Program crashes trying to read JSON file > > > Very quickly, it seems you are trying to sum on Strings > > Caused by: org.apache.flink.api.java. > aggregation.UnsupportedAggregationTypeException: The type java.lang.String > has currently not supported for built-in sum aggregations. > > Check your tuple types and be sure that you are not summing on strings. > > > 2014-11-25 16:55 GMT+01:00 Anirvan BASU : > >> Hello all, >> >> We are using Flink 0.7 and trying to read a large JSON file, reading some >> fields into a flink (3-tuple based) dataset, then performing some >> operations. >> >> We encountered the following runtime error: >> >> [QUOTE] >> Error: The main method caused an error. >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) >> at org.apache.flink.client.program.Client.run(Client.java:244) >> at >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) >> Caused by: >> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: >> The type java.lang.String has currently not supported for built-in sum >> aggregations. >> at >> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) >> at >> org.apache.flink.api.java.operators.AggregateOperator.(AggregateOperator.java:109) >> at >> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) >> at >> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) >> at >> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) >> ... 6 more >> [/QUOTE] >> >> >> >> The code snippet that could have caused this error (i.e. that we edited) >> is the following >> >> [CODE] >> >> import org.apache.flink.api.java.tuple.Tuple3; >> import org.apache.flink.util.Collector; >> import org.apache.flink.api.java.DataSet; >> import org.apache.flink.api.java.ExecutionEnvironment; >> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; >> import org.apache.sling.commons.json.JSONException; >> ... >> >> public static void main(String[] args) throws Exception { >> >> if(!parseParameters(args)) { >> return; >> } >> >> // set up the execution environment >> final ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> >> // get input data >> DataSet text = getTextDataSet(env); >> >> DataSet> counts = >> // split up the lines in pairs (3-tuples) containing: >> (timestamp,uuid,event) >> text.flatMap(new *SelectDataFlatMap()*) >> // group by the tuple field "0" and sum up tuple field "1" >> .groupBy(2) >> .sum(2); >> >> // emit result >> if(fileOutput) { >> counts.writeAsCsv(outputPath, "\n", " "); >> } else { >> counts.print(); >> } >> >> // execute program >> env.execute("Weblogs Programme"); >> } >> >> ... >> >> public static class *SelectDataFlatMap* extends >> JSONParseFlatMap> { >> >> @Override >> public void flatMap(String value, Collector> String, String>> out) >> throws Exception { >> try { >> out.collect(new Tuple3( >> getInt(value, "timestamp"), >> getString(value, "uuid"), >> getString(value, "event"))); >> } catch (JSONException e) { >> System.err.println("Field not found"); >> } >> } >> } >> >> [/CODE] >> >> >> >> [QUOTE] >> Error: The main method caused an error. >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) >> at org.apache.flink.client.program.Client.run(Client.java:244) >> at >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) >> Caused by: >> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: >> The type java.lang.String has currently not supported for built-in sum >> aggregations. >> at >> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) >> at >> org.apache.flink.api.java.operators.AggregateOperator.(AggregateOperator.java:109) >> at >> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) >> at >> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) >> at >> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) >> ... 6 more >> [/QUOTE] >> >> >> The JSON file is of the following nature, with a 2-level hierarchy for >> one field: >> [JSON] >> {timestamp: 1397731764 payload: {product: >> Younited uuid: >> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd >> platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 >> type: can-usage-v1 event: General,Login,Success}} >> {timestamp: 1397731765 payload: {product: >> Younited uuid: >> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e >> platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e >> type: can-usage-v1 event: General,App,Opened}} >> [/JSON] >> >> >> >> Thanks in advance for helping us to understand where we are going wrong. >> >> Anirvan >> > > > --047d7b3a833c922c5e0508be1c66 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
You can output your results in different wa= ys. If all you need is to write a file, I normally use the writeAsText meth= od (however, there is the writeAsCSV, writeAsFormattedText. Of write accord= ing to your custom FileOutputFormat.

datasetToPrint.writeAsTex= t("/path/to/file/with/permission", WriteMode.OVERWRITE);

<= /div>Keep in mind that this will output your tuple dataset. Therefore, if y= ou want to shape your output differently, It may be necessary to have furth= er processing.

saluti,
Stefano

2014-11-25 22:04 GMT+01:00 Anirvan B= ASU <anirvan.basu@inria.fr>:
Thanks to Aljoscha and Stefano for pointing o= ut the flaw.

We corrected the issue as follows= :

[CODE]

import org.apache.= flink.api.java.tuple.Tuple4;
import or= g.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;=
import org.apache.flink.api.java.ExecutionEnvironment;
import org.ap= ache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache= .sling.commons.json.JSONException;
...

=C2=A0=C2=A0=C2=A0 public static void main(String[] arg= s) throws Exception {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0if(!parseParameters(args)) {
=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0return;
=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0}
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// set up the execution environment
=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0final ExecutionEnvironment env =3D Execu= tionEnvironment.getExecutionEnvironment();
=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// get input data
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0DataSet<String> text =3D getTex= tDataSet(env);
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0 DataSet<Tuple4<Integer, String, Strin= g, Integer>> counts =3D
=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// split up the lines= in pairs (4-tuples) containing: (timestamp,uuid,event,count)
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0text.flatMap(new SelectDataFlatMap())
=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// group by the tuple= field "1" (an event - string) and sum up tuple field "3&quo= t; (integer - value 1)
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0.groupBy(1)
=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0.sum= (3);


=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0// emit result
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0if(= fileOutput) {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0c= ounts.writeAsCsv(outputPath, "\n", " ");
=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0} else {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0counts.print();
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= =C2=A0}
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0// execute program
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0env.execute("Weblogs Programme");
=C2=A0=C2=A0 =C2=A0}

...

=C2=A0=C2=A0=C2=A0 public static class SelectDataFlatMap extends
<= /span>=C2=A0=C2=A0 =C2=A0JSONParseFlatMap<String, Tuple4<Integer, String, String, Integer>> {

=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0private static final long serialVersionUID =3D= 1L;

=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0@Override
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0public void flatMap(String value, Col= lector<Tuple4<Integer, String, String, Integer>> record)
=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0th= rows Exception {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0try {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0record.collect(new Tuple4<Integer, String, String, Integer&= gt;(
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0getInt(valu= e, "timestamp"),
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0getSt= ring(value, "uuid"),
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= =C2=A0getString(value, "event"),
=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A01));
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0} catch (JSONException e) {
=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0System.err.println(&q= uot;Field not found");
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0}
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0}
=C2=A0=C2=A0 = =C2=A0}

=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=
[/CODE]

However, this time the issue was= different.
The programme executed correctly till status FINI= SHED.
However, there was no output :-((
i.e. Fo= r each Task Manager, an empty file is written.

When we checked further about the input text file that is read using env.r= eadTextFile() we find that instead of a text string (full text dataset) onl= y a small string is written!

Worse still ! this string value sometimes remains the sam= e over multiple runs of the programme ....
Is this natural ?= Is this just the handle to the file or the dataset ?
Is the = Collector() working correctly also ?


Note :
The actual JSON file (i.e. the text file t= hat should be read) is of the following nature, with a 2-level hierarchy fo= r one field:
[JSON]
{timestamp: <= span>1397731764 = =C2=A0=C2=A0 =C2=A0payload: {product: Younited =C2=A0=C2=A0 =C2=A0uuid: 754= 726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd =C2=A0=C2=A0 = =C2=A0platform: native =C2=A0=C2=A0 =C2=A0version: 6aa54aca95fb1b8ef2290136= ab12df2e4b011241 =C2=A0=C2=A0 =C2=A0type: can-usage-v1 =C2=A0=C2=A0 =C2=A0e= vent: General,Login,Success}}
{timestamp: 1397731765 =C2=A0=C2=A0 =C2=A0pay= load: {product: Younited =C2=A0=C2=A0 =C2=A0uuid: e0b3dad557ca77dc035fd22d1= e8608c4248526ab9318a85637dbf88228a4034e =C2=A0=C2=A0 =C2=A0platform: native= =C2=A0=C2=A0 =C2=A0version: 7b4b767060b62537b63c5d10d911870a14d2b84e =C2= =A0=C2=A0 =C2=A0type: can-usage-v1 =C2=A0=C2=A0 =C2=A0event: General,App,Op= ened}}
[/JSON]


So = now again, we are confused if we are doing it correctly :-((
=
Thanks in advance for helping u= s to understand where we are going wrong.
Anirvan


From: "Stefano Bortoli" <s.bortoli@gmail.com>
To:= "user" <user@flink.incubator.apache.org>
Cc: <= a href=3D"mailto:dev@flink.incubator.apache.org" target=3D"_blank">dev@flin= k.incubator.apache.org
Sent: Tuesday, November 25, 2014 5:05:= 34 PM
Subject: Re: Program crashes trying to read JSON file
<= div class=3D"h5">

Very quickly, it seems= you are trying to sum on Strings

Caused by: org.apache.f= link.api.java.
aggregation.UnsupportedAggregationTypeException: The typ= e java.lang.String has currently not supported for built-in sum aggregation= s.

Check your tuple types and be sure that you= are not summing on strings.

=
2014-11-25 16:55 GMT+01:00 Anirvan BASU <anirvan.basu@inria.fr>:
<= div>
Hello all,

We are using Fl= ink 0.7 and trying to read a large JSON file, reading some fields into a fl= ink=C2=A0 (3-tuple based) dataset, then performing some operations.

We encountered the following runtime error:
=

[QUOTE]
Error: The main method caused an = error.
org.apache.flink.client.program.ProgramInvocationException: The m= ain method caused an error.
=C2=A0=C2=A0 =C2=A0at org.apache.flink.clien= t.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
=C2= =A0=C2=A0 =C2=A0at org.apache.flink.client.program.PackagedProgram.invokeIn= teractiveModeForExecution(PackagedProgram.java:307)
=C2=A0=C2=A0 =C2=A0a= t org.apache.flink.client.program.Client.run(Client.java:244)
=C2=A0=C2= =A0 =C2=A0at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend= .java:347)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.CliFrontend.run= (CliFrontend.java:334)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.Cli= Frontend.parseParameters(CliFrontend.java:1001)
=C2=A0=C2=A0 =C2=A0at or= g.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by:= org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:= The type java.lang.String has currently not supported for built-in sum agg= regations.
=C2=A0=C2=A0 =C2=A0at org.apache.flink.api.java.aggregation.S= umAggregationFunction$SumAggregationFunctionFactory.createAggregationFuncti= on(SumAggregationFunction.java:186)
=C2=A0=C2=A0 =C2=A0at org.apache.fli= nk.api.java.operators.AggregateOperator.<init>(AggregateOperator.java= :109)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.api.java.operators.Unsorted= Grouping.aggregate(UnsortedGrouping.java:61)
=C2=A0=C2=A0 =C2=A0at org.a= pache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:7= 2)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.examples.java.wordcount.WordCo= unt.main(WordCount.java:75)
=C2=A0=C2=A0 =C2=A0at sun.reflect.NativeMeth= odAccessorImpl.invoke0(Native Method)
=C2=A0=C2=A0 =C2=A0at sun.reflect.= NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
=C2=A0= =C2=A0 =C2=A0at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingM= ethodAccessorImpl.java:43)
=C2=A0=C2=A0 =C2=A0at java.lang.reflect.Metho= d.invoke(Method.java:606)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.= program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
=C2=A0= =C2=A0 =C2=A0... 6 more
[/QUOTE]


=

The code snippet that could have caused this error (i.e= . that we edited) is the following

[CODE]
<= /div>

import org.apache.flink.api.java.tuple.Tuple3;
= import org.apache.flink.util.Collector;
import org.apache.flink.api.java= .DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
impo= rt org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import o= rg.apache.sling.commons.json.JSONException;
...

=
=C2=A0=C2=A0=C2=A0 public static void main(String[] args) throws Excep= tion {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0if(!parseParameters(args)) {
=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0return;
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0}
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0// set up the execution environment
=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0final ExecutionEnvironment env =3D ExecutionEnvironment.= getExecutionEnvironment();
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// get input data
=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0DataSet<String> text =3D getTextDataSet(env);
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0DataSet<Tuple3<Integer, String, String>> counts =3D
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0// split up the lines in pairs (3-tuples) containing: (timestamp,uuid,ev= ent)
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0text.flatMap(new SelectDataFlatMap())
=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// gr= oup by the tuple field "0" and sum up tuple field "1"=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0.groupBy(2)
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0.sum(2);

=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0// emit result
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0if(fileOu= tput) {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0counts.= writeAsCsv(outputPath, "\n", " ");
=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0} else {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0counts.print();
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0}<= br>=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0// execute program
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0env.e= xecute("Weblogs Programme");
=C2=A0=C2=A0 =C2=A0}

=
...

=C2=A0=C2=A0=C2=A0 public= static class SelectDataFlatMap extends
=C2=A0=C2=A0 = =C2=A0JSONParseFlatMap<String, Tuple3<Integer, String, String>>= {

=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0 @Override
=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0public void flatMap(String value, Collec= tor<Tuple3<Integer, String, String>> out)
=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0throws Exceptio= n {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0try {
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0out.collect(new Tuple3<Integer, String, String>(
=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0getInt(value, "timestamp"),
=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0getString(value, "uuid"),
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0getString(value, "event"= )));
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0} catch (J= SONException e) {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0System.err.println("Field not found");
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0}
=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0}
=C2=A0=C2=A0 =C2=A0}
=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0=C2=A0
[/CODE]


[QUOTE]
Error: The main method ca= used an error.
org.apache.flink.client.program.ProgramInvocationExceptio= n: The main method caused an error.
=C2=A0=C2=A0 =C2=A0at org.apache.fli= nk.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)<= br>=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.program.PackagedProgram.in= vokeInteractiveModeForExecution(PackagedProgram.java:307)
=C2=A0=C2=A0 = =C2=A0at org.apache.flink.client.program.Client.run(Client.java:244)
=C2= =A0=C2=A0 =C2=A0at org.apache.flink.client.CliFrontend.executeProgram(CliFr= ontend.java:347)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.CliFronte= nd.run(CliFrontend.java:334)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.clie= nt.CliFrontend.parseParameters(CliFrontend.java:1001)
=C2=A0=C2=A0 =C2= =A0at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Ca= used by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeEx= ception: The type java.lang.String has currently not supported for built-in= sum aggregations.
=C2=A0=C2=A0 =C2=A0at org.apache.flink.api.java.aggre= gation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregati= onFunction(SumAggregationFunction.java:186)
=C2=A0=C2=A0 =C2=A0at org.ap= ache.flink.api.java.operators.AggregateOperator.<init>(AggregateOpera= tor.java:109)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.api.java.operators.= UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
=C2=A0=C2=A0 =C2=A0= at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGroupin= g.java:72)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.examples.java.wordcoun= t.WordCount.main(WordCount.java:75)
=C2=A0=C2=A0 =C2=A0at sun.reflect.Na= tiveMethodAccessorImpl.invoke0(Native Method)
=C2=A0=C2=A0 =C2=A0at sun.= reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)=C2=A0=C2=A0 =C2=A0at sun.reflect.DelegatingMethodAccessorImpl.invoke(Del= egatingMethodAccessorImpl.java:43)
=C2=A0=C2=A0 =C2=A0at java.lang.refle= ct.Method.invoke(Method.java:606)
=C2=A0=C2=A0 =C2=A0at org.apache.flink= .client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)=C2=A0=C2=A0 =C2=A0... 6 more
[/QUOTE]


The JSON file is of the following nature, with a 2-level hierarchy = for one field:
[JSON]
{timestamp: 1397731764 =C2=A0= =C2=A0 =C2=A0payload: {product: Younited =C2=A0=C2=A0 =C2=A0uuid: 754726549= cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd =C2=A0=C2=A0 =C2=A0= platform: native =C2=A0=C2=A0 =C2=A0version: 6aa54aca95fb1b8ef2290136ab12df= 2e4b011241 =C2=A0=C2=A0 =C2=A0type: can-usage-v1 =C2=A0=C2=A0 =C2=A0event: = General,Login,Success}}
{timestamp: 1397731765 =C2=A0=C2=A0 =C2=A0payload: = {product: Younited =C2=A0=C2=A0 =C2=A0uuid: e0b3dad557ca77dc035fd22d1e8608c= 4248526ab9318a85637dbf88228a4034e =C2=A0=C2=A0 =C2=A0platform: native =C2= =A0=C2=A0 =C2=A0version: 7b4b767060b62537b63c5d10d911870a14d2b84e =C2=A0=C2= =A0 =C2=A0type: can-usage-v1 =C2=A0=C2=A0 =C2=A0event: General,App,Opened}}=
[/JSON]



Thanks in advance for helping us to understand where we are = going wrong.

Anirvan
<= br>

<= /div>
--047d7b3a833c922c5e0508be1c66--