spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: Running into the same problem as JIRA SPARK-19268
Date Wed, 24 May 2017 22:45:49 GMT
Even if I do simple count aggregation like below I get the same error as
https://issues.apache.org/jira/browse/SPARK-19268

Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"),
"24 hours", "24 hours"), df1.col("AppName")).count();


On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth909@gmail.com> wrote:

> Hi All,
>
> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
> Kafka
>
> I am running into the same problem as https://issues.apache.org/
> jira/browse/SPARK-19268 with my app(not KafkaWordCount).
>
> Here is my sample code
>
> *Here is how I create ReadStream*
>
> sparkSession.readStream()
>                 .format("kafka")
>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>                 .option("startingOffsets", "earliest")
>                 .option("failOnDataLoss", "false")
>                 .option("checkpointLocation", hdfsCheckPointDir)
>                 .load();
>
>
> *The core logic*
>
> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"),
df1.col("AppName")).agg(sum("Amount"));
> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
> query.awaitTermination();
>
>
> I can also provide any other information you may need.
>
> Thanks!
>

Mime
View raw message