flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <s.bort...@gmail.com>
Subject Re: Program crashes trying to read JSON file
Date Wed, 26 Nov 2014 07:37:59 GMT
You can output your results in different ways. If all you need is to write
a file, I normally use the writeAsText method (however, there is the
writeAsCSV, writeAsFormattedText. Of write according to your custom
FileOutputFormat.

datasetToPrint.writeAsText("/path/to/file/with/permission",
WriteMode.OVERWRITE);

Keep in mind that this will output your tuple dataset. Therefore, if you
want to shape your output differently, It may be necessary to have further
processing.

saluti,
Stefano

2014-11-25 22:04 GMT+01:00 Anirvan BASU <anirvan.basu@inria.fr>:

> Thanks to Aljoscha and Stefano for pointing out the flaw.
>
> We corrected the issue as follows:
>
> [CODE]
>
> import org.apache.flink.api.java.tuple.*Tuple4*;
> import org.apache.flink.util.Collector;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> import org.apache.sling.commons.json.JSONException;
> ...
>
>     public static void main(String[] args) throws Exception {
>
>         if(!parseParameters(args)) {
>             return;
>         }
>
>         // set up the execution environment
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>         // get input data
>         DataSet<String> text = getTextDataSet(env);
>
>         DataSet<Tuple4<Integer, String, String, *Integer*>> counts =
>                 // split up the lines in pairs (4-tuples) containing:
> (timestamp,uuid,event,*count*)
>                 text.flatMap(new SelectDataFlatMap())
>                 // group by the tuple field "1" (an event - string) and
> sum up tuple field "3" (integer - value 1)
>                 .*groupBy(1)*
>                 .*sum(3*);
>
>
>         // emit result
>         if(fileOutput) {
>             counts.writeAsCsv(outputPath, "\n", " ");
>         } else {
>             counts.print();
>         }
>
>         // execute program
>         env.execute("Weblogs Programme");
>     }
>
> ...
>
>     public static class SelectDataFlatMap extends
>     JSONParseFlatMap<String, *Tuple4*<Integer, String, String, Integer>>
{
>
>         private static final long serialVersionUID = 1L;
>
>         @Override
>         public void flatMap(String value, Collector<Tuple4<Integer,
> String, String, Integer>> record)
>                 throws Exception {
>             try {
>                 record.collect(new Tuple4<Integer, String, String,
> Integer>(
>                         getInt(value, "timestamp"),
>                         getString(value, "uuid"),
>                         getString(value, "event"),
>                         1));
>             } catch (JSONException e) {
>                 System.err.println("Field not found");
>             }
>         }
>     }
>
>
> [/CODE]
>
> However, this time the issue was different.
> The programme executed correctly till status FINISHED.
> However, there was no output :-((
> i.e. For each Task Manager, an empty file is written.
>
> When we checked further about the input text file that is read using
> env.readTextFile() we find that instead of a text string (full text
> dataset) only a small string is written!
> Something as :
> org.apache.flink.api.java.operators.DataSource@6bd8b476
>
> Worse still ! this string value sometimes remains the same over multiple
> runs of the programme ....
> Is this natural ? Is this just the handle to the file or the dataset ?
> Is the Collector() working correctly also ?
>
>
> Note :
> The actual JSON file (i.e. the text file that should be read) is of the
> following nature, with a 2-level hierarchy for one field:
> [JSON]
> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
> Younited     uuid:
> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>  type: can-usage-v1     event: General,Login,Success}}
> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
> Younited     uuid:
> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>  type: can-usage-v1     event: General,App,Opened}}
> [/JSON]
>
>
> So now again, we are confused if we are doing it correctly :-((
>
> Thanks in advance for helping us to understand where we are going wrong.
> Anirvan
>
> ------------------------------
>
> *From: *"Stefano Bortoli" <s.bortoli@gmail.com>
> *To: *"user" <user@flink.incubator.apache.org>
> *Cc: *dev@flink.incubator.apache.org
> *Sent: *Tuesday, November 25, 2014 5:05:34 PM
> *Subject: *Re: Program crashes trying to read JSON file
>
>
> Very quickly, it seems you are trying to sum on Strings
>
> Caused by: org.apache.flink.api.java.
> aggregation.UnsupportedAggregationTypeException: The type java.lang.String
> has currently not supported for built-in sum aggregations.
>
> Check your tuple types and be sure that you are not summing on strings.
>
>
> 2014-11-25 16:55 GMT+01:00 Anirvan BASU <anirvan.basu@inria.fr>:
>
>> Hello all,
>>
>> We are using Flink 0.7 and trying to read a large JSON file, reading some
>> fields into a flink  (3-tuple based) dataset, then performing some
>> operations.
>>
>> We encountered the following runtime error:
>>
>> [QUOTE]
>> Error: The main method caused an error.
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>     at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>     at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> Caused by:
>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>> The type java.lang.String has currently not supported for built-in sum
>> aggregations.
>>     at
>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>     at
>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>     at
>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>     at
>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>     at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>     ... 6 more
>> [/QUOTE]
>>
>>
>>
>> The code snippet that could have caused this error (i.e. that we edited)
>> is the following
>>
>> [CODE]
>>
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>> import org.apache.sling.commons.json.JSONException;
>> ...
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         if(!parseParameters(args)) {
>>             return;
>>         }
>>
>>         // set up the execution environment
>>         final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>>         // get input data
>>         DataSet<String> text = getTextDataSet(env);
>>
>>         DataSet<Tuple3<Integer, String, String>> counts =
>>                 // split up the lines in pairs (3-tuples) containing:
>> (timestamp,uuid,event)
>>                 text.flatMap(new *SelectDataFlatMap()*)
>>                 // group by the tuple field "0" and sum up tuple field "1"
>>                 .groupBy(2)
>>                 .sum(2);
>>
>>         // emit result
>>         if(fileOutput) {
>>             counts.writeAsCsv(outputPath, "\n", " ");
>>         } else {
>>             counts.print();
>>         }
>>
>>         // execute program
>>         env.execute("Weblogs Programme");
>>     }
>>
>> ...
>>
>>     public static class *SelectDataFlatMap* extends
>>     JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>>
>>         @Override
>>         public void flatMap(String value, Collector<Tuple3<Integer,
>> String, String>> out)
>>                 throws Exception {
>>             try {
>>                 out.collect(new Tuple3<Integer, String, String>(
>>                         getInt(value, "timestamp"),
>>                         getString(value, "uuid"),
>>                         getString(value, "event")));
>>             } catch (JSONException e) {
>>                 System.err.println("Field not found");
>>             }
>>         }
>>     }
>>
>> [/CODE]
>>
>>
>>
>> [QUOTE]
>> Error: The main method caused an error.
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>     at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>     at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> Caused by:
>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>> The type java.lang.String has currently not supported for built-in sum
>> aggregations.
>>     at
>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>     at
>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>     at
>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>     at
>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>     at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>     ... 6 more
>> [/QUOTE]
>>
>>
>> The JSON file is of the following nature, with a 2-level hierarchy for
>> one field:
>> [JSON]
>> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
>> Younited     uuid:
>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>>  type: can-usage-v1     event: General,Login,Success}}
>> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
>> Younited     uuid:
>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>>  type: can-usage-v1     event: General,App,Opened}}
>> [/JSON]
>>
>>
>>
>> Thanks in advance for helping us to understand where we are going wrong.
>>
>> Anirvan
>>
>
>
>

Mime
View raw message