flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Program crashes trying to read JSON file
Date Tue, 25 Nov 2014 16:03:46 GMT
Hi,
the problems seems to be originating from this statement:
text.flatMap(new SelectDataFlatMap())
   // group by the tuple field "0" and sum up tuple field "1"
   .groupBy(2)
   .sum(2);

The indices in these operations are 0-based. What do you expect the
result of this operation might be? The aggregate operations only work
on number data types right now.

Regards,
Aljoscha

On Tue, Nov 25, 2014 at 4:55 PM, Anirvan BASU <anirvan.basu@inria.fr> wrote:
> Hello all,
>
> We are using Flink 0.7 and trying to read a large JSON file, reading some
> fields into a flink  (3-tuple based) dataset, then performing some
> operations.
>
> We encountered the following runtime error:
>
> [QUOTE]
> Error: The main method caused an error.
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>     at org.apache.flink.client.program.Client.run(Client.java:244)
>     at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> Caused by:
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> The type java.lang.String has currently not supported for built-in sum
> aggregations.
>     at
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>     at
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>     at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>     ... 6 more
> [/QUOTE]
>
>
>
> The code snippet that could have caused this error (i.e. that we edited) is
> the following
>
> [CODE]
>
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.util.Collector;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> import org.apache.sling.commons.json.JSONException;
> ...
>
>     public static void main(String[] args) throws Exception {
>
>         if(!parseParameters(args)) {
>             return;
>         }
>
>         // set up the execution environment
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>         // get input data
>         DataSet<String> text = getTextDataSet(env);
>
>         DataSet<Tuple3<Integer, String, String>> counts =
>                 // split up the lines in pairs (3-tuples) containing:
> (timestamp,uuid,event)
>                 text.flatMap(new SelectDataFlatMap())
>                 // group by the tuple field "0" and sum up tuple field "1"
>                 .groupBy(2)
>                 .sum(2);
>
>         // emit result
>         if(fileOutput) {
>             counts.writeAsCsv(outputPath, "\n", " ");
>         } else {
>             counts.print();
>         }
>
>         // execute program
>         env.execute("Weblogs Programme");
>     }
>
> ...
>
>     public static class SelectDataFlatMap extends
>     JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>
>         @Override
>         public void flatMap(String value, Collector<Tuple3<Integer, String,
> String>> out)
>                 throws Exception {
>             try {
>                 out.collect(new Tuple3<Integer, String, String>(
>                         getInt(value, "timestamp"),
>                         getString(value, "uuid"),
>                         getString(value, "event")));
>             } catch (JSONException e) {
>                 System.err.println("Field not found");
>             }
>         }
>     }
>
> [/CODE]
>
>
>
> [QUOTE]
> Error: The main method caused an error.
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>     at org.apache.flink.client.program.Client.run(Client.java:244)
>     at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> Caused by:
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> The type java.lang.String has currently not supported for built-in sum
> aggregations.
>     at
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>     at
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>     at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>     ... 6 more
> [/QUOTE]
>
>
> The JSON file is of the following nature, with a 2-level hierarchy for one
> field:
> [JSON]
> {timestamp: 1397731764     payload: {product: Younited     uuid:
> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
> platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
> type: can-usage-v1     event: General,Login,Success}}
> {timestamp: 1397731765     payload: {product: Younited     uuid:
> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
> platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
> type: can-usage-v1     event: General,App,Opened}}
> [/JSON]
>
>
>
> Thanks in advance for helping us to understand where we are going wrong.
>
> Anirvan

Mime
View raw message