Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3206110FFA for ; Wed, 26 Nov 2014 08:35:24 +0000 (UTC) Received: (qmail 38604 invoked by uid 500); 26 Nov 2014 08:35:24 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 38533 invoked by uid 500); 26 Nov 2014 08:35:24 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 38524 invoked by uid 99); 26 Nov 2014 08:35:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Nov 2014 08:35:24 +0000 X-ASF-Spam-Status: No, hits=-1997.8 required=5.0 tests=ALL_TRUSTED,HTML_MESSAGE,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 26 Nov 2014 08:35:21 +0000 Received: (qmail 37925 invoked by uid 99); 26 Nov 2014 08:35:01 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Nov 2014 08:35:01 +0000 Received: from mail-yh0-f48.google.com (mail-yh0-f48.google.com [209.85.213.48]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id DC4521A037A for ; Wed, 26 Nov 2014 08:34:48 +0000 (UTC) Received: by mail-yh0-f48.google.com with SMTP id i57so1092236yha.7 for ; Wed, 26 Nov 2014 00:34:57 -0800 (PST) MIME-Version: 1.0 X-Received: by 10.236.65.233 with SMTP id f69mr29110041yhd.183.1416990897078; Wed, 26 Nov 2014 00:34:57 -0800 (PST) Received: by 10.170.139.70 with HTTP; Wed, 26 Nov 2014 00:34:57 -0800 (PST) In-Reply-To: <761343114.20032576.1416988860967.JavaMail.zimbra@inria.fr> References: <1416987570.53946.ezmlm@flink.incubator.apache.org> <761343114.20032576.1416988860967.JavaMail.zimbra@inria.fr> Date: Wed, 26 Nov 2014 09:34:57 +0100 Message-ID: Subject: Re: Program crashes trying to read JSON file From: Fabian Hueske To: "user@flink.incubator.apache.org" Content-Type: multipart/alternative; boundary=001a1132e71e120c680508bee730 X-Virus-Checked: Checked by ClamAV on apache.org --001a1132e71e120c680508bee730 Content-Type: text/plain; charset=UTF-8 Hi Anirvan, The CSVInputFormat works with two delimiters, a record delimiter and a field delimiter. The input data is split at the record delimiter into records. Each record is split along the field delimiters into several fields. The number and type of fields must be constant. There is not logic to handle nested data. Looking at your data, I assume that the newline ('\n') character is the record delimiter. However due to the nested structure of your data, I don't see a valid field delimiter. If you have this input: {timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}} Splitting along the field delimiter tab ('\t') would result in 7 String fields: - {timestamp: 1397731764 - payload: {product: Younited - uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd - platform: native - version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 - type: can-usage-v1 - event: General,Login,Success}} I guess this is not exactly what you want (curly braces, field names, etc.). The best way to handle JSON data right now is to have the records delimited by a character such as newline, read the data line-wise with the TextInputFormat, and use custom parse logic in a MapFunction. There you can also use JSON libraries (make sure they work thread-safe!). Best, Fabian 2014-11-26 9:01 GMT+01:00 Anirvan BASU : > Ciao Stefano ! > > Thanks for this early morning information, very helpful. > Yes, for outputting the data we are using WriteAsCSV which is stable over > different versions of Flink. > > Our current concern is "reading" a JSON file into a dataset. > As you can see, we have a simple 2-level JSON hierarchy that can be easily > mapped to a fixed-column CSV. > But the place we are stuck at currently is in reading the file correctly > into a tuple-based dataset in memory. > Once this is achieved, the rest will be fairly simple dataset > transformations. > > As you can see from the pasted code, we used functions developed from the > stream connector for our purposes. (Thanks to Gyula and Marton for that > information) > > If reading a JSON file using functions already developed is not possible > then we will have to develop some custom functions on hardcore string > operations to do the same. > That would be like reinventing the wheel ... :-(( > > Any advice in this regard will be highly appreciated. > > Thanks in advance to all, > Anirvan > > ------------------------------ > > *From: *"Stefano Bortoli" > *To: *"user" > *Sent: *Wednesday, November 26, 2014 8:37:59 AM > > *Subject: *Re: Program crashes trying to read JSON file > > You can output your results in different ways. If all you need is to write > a file, I normally use the writeAsText method (however, there is the > writeAsCSV, writeAsFormattedText. Of write according to your custom > FileOutputFormat. > > datasetToPrint.writeAsText("/path/to/file/with/permission", > WriteMode.OVERWRITE); > > Keep in mind that this will output your tuple dataset. Therefore, if you > want to shape your output differently, It may be necessary to have further > processing. > > saluti, > Stefano > > 2014-11-25 22:04 GMT+01:00 Anirvan BASU : > >> Thanks to Aljoscha and Stefano for pointing out the flaw. >> >> We corrected the issue as follows: >> >> [CODE] >> >> import org.apache.flink.api.java.tuple.*Tuple4*; >> import org.apache.flink.util.Collector; >> import org.apache.flink.api.java.DataSet; >> import org.apache.flink.api.java.ExecutionEnvironment; >> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; >> import org.apache.sling.commons.json.JSONException; >> ... >> >> public static void main(String[] args) throws Exception { >> >> if(!parseParameters(args)) { >> return; >> } >> >> // set up the execution environment >> final ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> >> // get input data >> DataSet text = getTextDataSet(env); >> >> DataSet> counts = >> // split up the lines in pairs (4-tuples) containing: >> (timestamp,uuid,event,*count*) >> text.flatMap(new SelectDataFlatMap()) >> // group by the tuple field "1" (an event - string) and >> sum up tuple field "3" (integer - value 1) >> .*groupBy(1)* >> .*sum(3*); >> >> >> // emit result >> if(fileOutput) { >> counts.writeAsCsv(outputPath, "\n", " "); >> } else { >> counts.print(); >> } >> >> // execute program >> env.execute("Weblogs Programme"); >> } >> >> ... >> >> public static class SelectDataFlatMap extends >> JSONParseFlatMap> >> { >> >> private static final long serialVersionUID = 1L; >> >> @Override >> public void flatMap(String value, Collector> String, String, Integer>> record) >> throws Exception { >> try { >> record.collect(new Tuple4> Integer>( >> getInt(value, "timestamp"), >> getString(value, "uuid"), >> getString(value, "event"), >> 1)); >> } catch (JSONException e) { >> System.err.println("Field not found"); >> } >> } >> } >> >> >> [/CODE] >> >> However, this time the issue was different. >> The programme executed correctly till status FINISHED. >> However, there was no output :-(( >> i.e. For each Task Manager, an empty file is written. >> >> When we checked further about the input text file that is read using >> env.readTextFile() we find that instead of a text string (full text >> dataset) only a small string is written! >> Something as : >> org.apache.flink.api.java.operators.DataSource@6bd8b476 >> >> Worse still ! this string value sometimes remains the same over multiple >> runs of the programme .... >> Is this natural ? Is this just the handle to the file or the dataset ? >> Is the Collector() working correctly also ? >> >> >> Note : >> The actual JSON file (i.e. the text file that should be read) is of the >> following nature, with a 2-level hierarchy for one field: >> [JSON] >> {timestamp: 1397731764 payload: {product: >> Younited uuid: >> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd >> platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 >> type: can-usage-v1 event: General,Login,Success}} >> {timestamp: 1397731765 payload: {product: >> Younited uuid: >> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e >> platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e >> type: can-usage-v1 event: General,App,Opened}} >> [/JSON] >> >> >> So now again, we are confused if we are doing it correctly :-(( >> >> Thanks in advance for helping us to understand where we are going wrong. >> Anirvan >> >> ------------------------------ >> >> *From: *"Stefano Bortoli" >> *To: *"user" >> *Cc: *dev@flink.incubator.apache.org >> *Sent: *Tuesday, November 25, 2014 5:05:34 PM >> *Subject: *Re: Program crashes trying to read JSON file >> >> >> Very quickly, it seems you are trying to sum on Strings >> >> Caused by: org.apache.flink.api.java. >> aggregation.UnsupportedAggregationTypeException: The type >> java.lang.String has currently not supported for built-in sum aggregations. >> >> Check your tuple types and be sure that you are not summing on strings. >> >> >> 2014-11-25 16:55 GMT+01:00 Anirvan BASU : >> >>> Hello all, >>> >>> We are using Flink 0.7 and trying to read a large JSON file, reading >>> some fields into a flink (3-tuple based) dataset, then performing some >>> operations. >>> >>> We encountered the following runtime error: >>> >>> [QUOTE] >>> Error: The main method caused an error. >>> org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error. >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) >>> at org.apache.flink.client.program.Client.run(Client.java:244) >>> at >>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) >>> Caused by: >>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: >>> The type java.lang.String has currently not supported for built-in sum >>> aggregations. >>> at >>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) >>> at >>> org.apache.flink.api.java.operators.AggregateOperator.(AggregateOperator.java:109) >>> at >>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) >>> at >>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) >>> at >>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) >>> ... 6 more >>> [/QUOTE] >>> >>> >>> >>> The code snippet that could have caused this error (i.e. that we edited) >>> is the following >>> >>> [CODE] >>> >>> import org.apache.flink.api.java.tuple.Tuple3; >>> import org.apache.flink.util.Collector; >>> import org.apache.flink.api.java.DataSet; >>> import org.apache.flink.api.java.ExecutionEnvironment; >>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; >>> import org.apache.sling.commons.json.JSONException; >>> ... >>> >>> public static void main(String[] args) throws Exception { >>> >>> if(!parseParameters(args)) { >>> return; >>> } >>> >>> // set up the execution environment >>> final ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> >>> // get input data >>> DataSet text = getTextDataSet(env); >>> >>> DataSet> counts = >>> // split up the lines in pairs (3-tuples) containing: >>> (timestamp,uuid,event) >>> text.flatMap(new *SelectDataFlatMap()*) >>> // group by the tuple field "0" and sum up tuple field >>> "1" >>> .groupBy(2) >>> .sum(2); >>> >>> // emit result >>> if(fileOutput) { >>> counts.writeAsCsv(outputPath, "\n", " "); >>> } else { >>> counts.print(); >>> } >>> >>> // execute program >>> env.execute("Weblogs Programme"); >>> } >>> >>> ... >>> >>> public static class *SelectDataFlatMap* extends >>> JSONParseFlatMap> { >>> >>> @Override >>> public void flatMap(String value, Collector>> String, String>> out) >>> throws Exception { >>> try { >>> out.collect(new Tuple3( >>> getInt(value, "timestamp"), >>> getString(value, "uuid"), >>> getString(value, "event"))); >>> } catch (JSONException e) { >>> System.err.println("Field not found"); >>> } >>> } >>> } >>> >>> [/CODE] >>> >>> >>> >>> [QUOTE] >>> Error: The main method caused an error. >>> org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error. >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) >>> at org.apache.flink.client.program.Client.run(Client.java:244) >>> at >>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) >>> Caused by: >>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: >>> The type java.lang.String has currently not supported for built-in sum >>> aggregations. >>> at >>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) >>> at >>> org.apache.flink.api.java.operators.AggregateOperator.(AggregateOperator.java:109) >>> at >>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) >>> at >>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) >>> at >>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) >>> ... 6 more >>> [/QUOTE] >>> >>> >>> The JSON file is of the following nature, with a 2-level hierarchy for >>> one field: >>> [JSON] >>> {timestamp: 1397731764 payload: {product: >>> Younited uuid: >>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd >>> platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 >>> type: can-usage-v1 event: General,Login,Success}} >>> {timestamp: 1397731765 payload: {product: >>> Younited uuid: >>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e >>> platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e >>> type: can-usage-v1 event: General,App,Opened}} >>> [/JSON] >>> >>> >>> >>> Thanks in advance for helping us to understand where we are going wrong. >>> >>> Anirvan >>> >> >> >> > > --001a1132e71e120c680508bee730 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Anirvan,

The CSVInputForm= at works with two delimiters, a record delimiter and a field delimiter. The= input data=C2=A0is split at the record delimiter into records. Each record= is split along the field delimiters into several fields. The number and ty= pe of fields must be constant. There is not logic to handle nested data.
Looking at your data, I assume that the newline ('\n') char= acter is the record delimiter. However due to the nested structure of your = data, I don't see a valid field delimiter. If you have this input:

{timestamp: 1397731764<= /span> =C2=A0=C2=A0 =C2=A0payload: {product: Younited =C2=A0=C2=A0 =C2=A0uu= id: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd =C2=A0= =C2=A0 =C2=A0platform: native =C2=A0=C2=A0 =C2=A0version: 6aa54aca95fb1b8ef= 2290136ab12df2e4b011241 =C2=A0=C2=A0 =C2=A0type: can-usage-v1 =C2=A0=C2=A0 = =C2=A0event: General,Login,Success}}

Splitting alo= ng the field delimiter tab ('\t') would result in=C2=A07=C2=A0Strin= g=C2=A0fields:

- {timestamp: 1= 397731764
- payload: {product: Younited
- uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4= bb2b11841803a25eadd
- platform: native
<= div>- version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
<= div>- type: can-usage-v1
- event: General,Logi= n,Success}}

I guess this= is not exactly what you want (curly braces, field names, etc.).

The best way to handle JSON data r= ight now is to have the records delimited by a character such as newline, r= ead the data line-wise with the TextInputFormat, and use custom parse logic= in a MapFunction. There you can also use JSON libraries (make sure they wo= rk thread-safe!).

Best, = Fabian

2014-11-26 9:01 GMT+01:00 Anirvan BASU <anirvan.basu@inria.fr<= /a>>:
C= iao Stefano !

Thanks for this early morning in= formation, very helpful.
Yes, for outputting the data we are = using WriteAsCSV which is stable over different versions of Flink.

Our current concern is "reading" a JSON file= into a dataset.
As you can see, we have a simple 2-level JSO= N hierarchy that can be easily mapped to a fixed-column CSV.
= But the place we are stuck at currently is in reading the file correctly in= to a tuple-based dataset in memory.
Once this is achieved, th= e rest will be fairly simple dataset transformations.

As you can see from the pasted code, we used functions developed fr= om the stream connector for our purposes. (Thanks to Gyula and Marton for t= hat information)

If reading a JSON file using = functions already developed is not possible then we will have to develop so= me custom functions on hardcore string operations to do the same.
That would be like reinventing the wheel ... :-((

<= /div>
Any advice in this regard will be highly appreciated.

Thanks in advance to all,
Anirvan
=


From: "Stefano Bortoli" <s.bortoli@gmail.com>
To: "user" &= lt;use= r@flink.incubator.apache.org>
Sent: Wednesday, Nove= mber 26, 2014 8:37:59 AM

Subject: Re: Prog= ram crashes trying to read JSON file

You can output your results in different ways. If all you need = is to write a file, I normally use the writeAsText method (however, there i= s the writeAsCSV, writeAsFormattedText. Of write according to your custom F= ileOutputFormat.

datasetToPrint.writeAsText("/= path/to/file/with/permission", WriteMode.OVERWRITE);

Keep in mind that this will output your tuple dataset. Therefore, if= you want to shape your output differently, It may be necessary to have fur= ther processing.

saluti,
Stefano

2014-11-25 22:04 GMT+01= :00 Anirvan BASU <anirvan.basu@inria.fr>:
Thanks to Aljoscha and Stefano for pointing o= ut the flaw.

We corrected the issue as follows= :

[CODE]

import org.apache.= flink.api.java.tuple.Tuple4;
import org.apa= che.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
i= mport org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.= flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.slin= g.commons.json.JSONException;
...

=
=C2=A0=C2=A0=C2=A0 public static void main(String[] args) throws= Exception {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0if(!parseParameters(args)) {
=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0return;
=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0}
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0// set up the execution environment
=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0final ExecutionEnvironment env =3D ExecutionEn= vironment.getExecutionEnvironment();
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// get input data
=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0DataSet<String> text =3D getTextDataSet(= env);
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0=C2=A0 DataSet<Tuple4<Integer, String, String, Integer>> counts =3D
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// split up the lines in pairs = (4-tuples) containing: (timestamp,uuid,event,count)
=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0te= xt.flatMap(new SelectDataFlatMap())
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// group by the tuple field "= 1" (an event - string) and sum up tuple field "3" (integer -= value 1)
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0.groupBy(1)
=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0.sum(3)= ;


=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0// emit result
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0if(fileOutput) {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0counts.writeAsCsv(outputPath, "\n", " ");
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0} else {
=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0counts.print();
=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0}
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0// execute program
=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0env.execute("Weblogs Programme");
=C2=A0=C2=A0 = =C2=A0}

...

=
=C2=A0=C2=A0=C2=A0 public static class SelectDataFlatMap extends=
=C2=A0=C2=A0 =C2=A0JSONParseFlatMap<String, Tuple4<Integer, String, String, Integer>> {

=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0private static final long serialVersionU= ID =3D 1L;

=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0@Overrid= e
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0public void flatMap(String value= , Collector<Tuple4<Integer, String, String, Integer>> record)=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0throws Exception {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= =C2=A0try {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0record.collect(new Tuple4<Integer, String, String, In= teger>(
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0getInt(value, &= quot;timestamp"),
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0getStrin= g(value, "uuid"),
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0getString(value, "event"),
=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A01));
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0} catch (JSONException e) {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0System.err.println("Fi= eld not found");
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= =C2=A0}
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0}
=C2=A0=C2=A0 =C2=A0}=

=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=
[/CODE]

However, this time = the issue was different.
The programme executed correctly til= l status FINISHED.
However, there was no output :-((
i.e. For each Task Manager, an empty file is written.
<= br>
When we checked further about the input text file that is rea= d using env.readTextFile() we find that instead of a text string (full text= dataset) only a small string is written!
Something as :
<= a href=3D"mailto:org.apache.flink.api.java.operators.DataSource@6bd8b476" t= arget=3D"_blank">org.apache.flink.api.java.operators.DataSource@6bd8b476

Worse still ! this string value sometimes re= mains the same over multiple runs of the programme ....
Is t= his natural ? Is this just the handle to the file or the dataset ?
Is the Collector() working correctly also ?

<= div>
Note :
The actual JSON file (i.e. th= e text file that should be read) is of the following nature, with a 2-level= hierarchy for one field:
[JSON]
{timestamp: 1397731764 =C2= =A0=C2=A0 =C2=A0payload: {product: Younited =C2=A0=C2=A0 =C2=A0uuid: 754726= 549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd =C2=A0=C2=A0 =C2= =A0platform: native =C2=A0=C2=A0 =C2=A0version: 6aa54aca95fb1b8ef2290136ab1= 2df2e4b011241 =C2=A0=C2=A0 =C2=A0type: can-usage-v1 =C2=A0=C2=A0 =C2=A0even= t: General,Login,Success}}
{timestamp: 1397731765 =C2=A0=C2=A0 =C2=A0payloa= d: {product: Younited =C2=A0=C2=A0 =C2=A0uuid: e0b3dad557ca77dc035fd22d1e86= 08c4248526ab9318a85637dbf88228a4034e =C2=A0=C2=A0 =C2=A0platform: native = =C2=A0=C2=A0 =C2=A0version: 7b4b767060b62537b63c5d10d911870a14d2b84e =C2=A0= =C2=A0 =C2=A0type: can-usage-v1 =C2=A0=C2=A0 =C2=A0event: General,App,Opene= d}}
[/JSON]


So now again,= we are confused if we are doing it correctly :-((

=
Thanks in advance for helping us to understand where we ar= e going wrong.
Anirvan


From: "Stefano Bortoli" &l= t;s.bortoli@gmail.= com>
To: "user" <user@flink.incubator.apache.org>
Cc:
dev@flink.incubator.apache.org
Sent: Tuesday, No= vember 25, 2014 5:05:34 PM
Subject: Re: Program crashes trying to= read JSON file


Very quickly, = it seems you are trying to sum on Strings

Caused by: org.= apache.flink.api.java.
aggregation.UnsupportedAggregationTypeException:= The type java.lang.String has currently not supported for built-in sum agg= regations.

Check your tuple types and be sure = that you are not summing on strings.


2014-11-25 16:55 GMT+01:00 Anirvan B= ASU <anirvan.basu@inria.fr>:
=
Hello all,

We are using Fli= nk 0.7 and trying to read a large JSON file, reading some fields into a fli= nk=C2=A0 (3-tuple based) dataset, then performing some operations.

We encountered the following runtime error:
<= div>
[QUOTE]
Error: The main method caused an e= rror.
org.apache.flink.client.program.ProgramInvocationException: The ma= in method caused an error.
=C2=A0=C2=A0 =C2=A0at org.apache.flink.client= .program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
=C2=A0= =C2=A0 =C2=A0at org.apache.flink.client.program.PackagedProgram.invokeInter= activeModeForExecution(PackagedProgram.java:307)
=C2=A0=C2=A0 =C2=A0at o= rg.apache.flink.client.program.Client.run(Client.java:244)
=C2=A0=C2=A0 = =C2=A0at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.jav= a:347)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.CliFrontend.run(Cli= Frontend.java:334)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.CliFron= tend.parseParameters(CliFrontend.java:1001)
=C2=A0=C2=A0 =C2=A0at org.ap= ache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org= .apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The= type java.lang.String has currently not supported for built-in sum aggrega= tions.
=C2=A0=C2=A0 =C2=A0at org.apache.flink.api.java.aggregation.SumAg= gregationFunction$SumAggregationFunctionFactory.createAggregationFunction(S= umAggregationFunction.java:186)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.a= pi.java.operators.AggregateOperator.<init>(AggregateOperator.java:109= )
=C2=A0=C2=A0 =C2=A0at org.apache.flink.api.java.operators.UnsortedGrou= ping.aggregate(UnsortedGrouping.java:61)
=C2=A0=C2=A0 =C2=A0at org.apach= e.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)=C2=A0=C2=A0 =C2=A0at org.apache.flink.examples.java.wordcount.WordCount.= main(WordCount.java:75)
=C2=A0=C2=A0 =C2=A0at sun.reflect.NativeMethodAc= cessorImpl.invoke0(Native Method)
=C2=A0=C2=A0 =C2=A0at sun.reflect.Nati= veMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
=C2=A0=C2= =A0 =C2=A0at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMeth= odAccessorImpl.java:43)
=C2=A0=C2=A0 =C2=A0at java.lang.reflect.Method.i= nvoke(Method.java:606)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.pro= gram.PackagedProgram.callMainMethod(PackagedProgram.java:389)
=C2=A0=C2= =A0 =C2=A0... 6 more
[/QUOTE]



The code snippet that could have caused this error (i.e. t= hat we edited) is the following

[CODE]

import org.apache.flink.api.java.tuple.Tuple3;
imp= ort org.apache.flink.util.Collector;
import org.apache.flink.api.java.Da= taSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import = org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.= apache.sling.commons.json.JSONException;
...

=C2=A0=C2=A0=C2=A0 public static void main(String[] args) throws Exceptio= n {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0if(!parseParameters(args)) {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0return;
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0}
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0// set up the execution environment
=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0final ExecutionEnvironment env =3D ExecutionEnvironment.get= ExecutionEnvironment();
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// get input data
=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0DataSet<String> text =3D getTextDataSet(env);
=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0DataSet<Tuple3<Integer, String, String>> counts =3D
=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0//= split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event)<= br>=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0text.flatMap(new SelectDataFlatMap())
=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0// group= by the tuple field "0" and sum up tuple field "1"
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0.groupBy(2)
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0.sum(2);

=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0// emit result
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0if(fileOu= tput) {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0counts.= writeAsCsv(outputPath, "\n", " ");
=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0} else {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0counts.print();
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0}<= br>=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0
=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0// execute program
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0env.e= xecute("Weblogs Programme");
=C2=A0=C2=A0 =C2=A0}

=
...

=C2=A0=C2=A0=C2=A0 public= static class SelectDataFlatMap extends
=C2=A0=C2=A0 = =C2=A0JSONParseFlatMap<String, Tuple3<Integer, String, String>>= {

=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0 @Override
=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0public void flatMap(String value, Collec= tor<Tuple3<Integer, String, String>> out)
=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0throws Exceptio= n {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0try {
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0out.collect(new Tuple3<Integer, String, String>(
=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0getInt(value, "timestamp"),
=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0getString(value, "uuid"),
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0getString(value, "event"= )));
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0} catch (J= SONException e) {
=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0System.err.println("Field not found");
= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0}
=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0}
=C2=A0=C2=A0 =C2=A0}
=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0=C2=A0
[/CODE]


[QUOTE]
Error: The main method ca= used an error.
org.apache.flink.client.program.ProgramInvocationExceptio= n: The main method caused an error.
=C2=A0=C2=A0 =C2=A0at org.apache.fli= nk.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)<= br>=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.program.PackagedProgram.in= vokeInteractiveModeForExecution(PackagedProgram.java:307)
=C2=A0=C2=A0 = =C2=A0at org.apache.flink.client.program.Client.run(Client.java:244)
=C2= =A0=C2=A0 =C2=A0at org.apache.flink.client.CliFrontend.executeProgram(CliFr= ontend.java:347)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.client.CliFronte= nd.run(CliFrontend.java:334)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.clie= nt.CliFrontend.parseParameters(CliFrontend.java:1001)
=C2=A0=C2=A0 =C2= =A0at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Ca= used by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeEx= ception: The type java.lang.String has currently not supported for built-in= sum aggregations.
=C2=A0=C2=A0 =C2=A0at org.apache.flink.api.java.aggre= gation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregati= onFunction(SumAggregationFunction.java:186)
=C2=A0=C2=A0 =C2=A0at org.ap= ache.flink.api.java.operators.AggregateOperator.<init>(AggregateOpera= tor.java:109)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.api.java.operators.= UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
=C2=A0=C2=A0 =C2=A0= at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGroupin= g.java:72)
=C2=A0=C2=A0 =C2=A0at org.apache.flink.examples.java.wordcoun= t.WordCount.main(WordCount.java:75)
=C2=A0=C2=A0 =C2=A0at sun.reflect.Na= tiveMethodAccessorImpl.invoke0(Native Method)
=C2=A0=C2=A0 =C2=A0at sun.= reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)=C2=A0=C2=A0 =C2=A0at sun.reflect.DelegatingMethodAccessorImpl.invoke(Del= egatingMethodAccessorImpl.java:43)
=C2=A0=C2=A0 =C2=A0at java.lang.refle= ct.Method.invoke(Method.java:606)
=C2=A0=C2=A0 =C2=A0at org.apache.flink= .client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)=C2=A0=C2=A0 =C2=A0... 6 more
[/QUOTE]


The JSON file is of the following nature, with a 2-level hierarchy = for one field:
[JSON]
{timestamp: 1397731764 =C2=A0= =C2=A0 =C2=A0payload: {product: Younited =C2=A0=C2=A0 =C2=A0uuid: 754726549= cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd =C2=A0=C2=A0 =C2=A0= platform: native =C2=A0=C2=A0 =C2=A0version: 6aa54aca95fb1b8ef2290136ab12df= 2e4b011241 =C2=A0=C2=A0 =C2=A0type: can-usage-v1 =C2=A0=C2=A0 =C2=A0event: = General,Login,Success}}
{timestamp: 1397731765 =C2=A0=C2=A0 =C2=A0payload: = {product: Younited =C2=A0=C2=A0 =C2=A0uuid: e0b3dad557ca77dc035fd22d1e8608c= 4248526ab9318a85637dbf88228a4034e =C2=A0=C2=A0 =C2=A0platform: native =C2= =A0=C2=A0 =C2=A0version: 7b4b767060b62537b63c5d10d911870a14d2b84e =C2=A0=C2= =A0 =C2=A0type: can-usage-v1 =C2=A0=C2=A0 =C2=A0event: General,App,Opened}}=
[/JSON]



Thanks in advance for helping us to understand where we are = going wrong.
=

Anirvan

=


=


--001a1132e71e120c680508bee730--