spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chandra Mohan, Ananda Vel Murugan" <Ananda.Muru...@honeywell.com>
Subject RE: Get the previous state string in Spark streaming
Date Mon, 19 Oct 2015 07:12:29 GMT
Thank you, we got it fixed.

Regards,
Anand.C

From: Tathagata Das [mailto:tdas@databricks.com]
Sent: Friday, October 16, 2015 3:38 PM
To: Chandra Mohan, Ananda Vel Murugan
Cc: user
Subject: Re: Get the previous state string in Spark streaming

A simple Javadoc look up in Java List<http://docs.oracle.com/javase/7/docs/api/java/util/List.html#add(E)>
shows that List.add can throw UnsupportedOperationException if the implementation of the List
interface does not support that operation (example, does not support adding null). A good
place to start would be to print the parameter to the List.add(), and the type of class used
for List.add(). Even better if you can reproduce it locally, then you will get the prints
in the driver logs. Otherwise you will have to hunt for that print statement in the executor
logs.

Also, I am not sure what you are trying to do but using state.toString() (which is, Optional.toString)
to compare with null, and then trying to add that to a list, both seem pretty weird to me.
Are you what you want to do works as you intend to. Might be worth trying this out locally
first with println() to see whether the logic actually works as you think it should.

Hope this helps.

On Fri, Oct 16, 2015 at 2:23 AM, Chandra Mohan, Ananda Vel Murugan <Ananda.Murugan@honeywell.com<mailto:Ananda.Murugan@honeywell.com>>
wrote:
Hi,

Thanks for the response. We are trying to implement something similar as discussed in the
following SFO post.

http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation

We are doing it in java while accepted answer(second answer) in this post is in Scala.


We wrote our java code taking this scala code as reference. But we are getting exception in
highlighted line i.e. in  return Optional.of(events.add(state.toString());); Specifically,
it happens when we call events.add()



final Function2<List<String>, Optional<List<String>>, Optional<List<String>>>
updateFunc =

       new Function2<List<String>, Optional<List<String>>,

Optional<List<String>>>() {



        public Optional<List<String>> call(List<String> events, Optional<List<String>>
state) throws Exception {

        // TODO Auto-generated method stub

            if(state.toString()==null)

               return Optional.of(events);

            else {

//UnsupportedOperationException here

               return Optional.of(events.add(state.toString()););

            }

       }

};

Please let us know if you need more details. Unfortunately we are not in a position to share
whole code.

Thanks



Regards,

Anand/Yogesh
From: Tathagata Das [mailto:tdas@databricks.com<mailto:tdas@databricks.com>]
Sent: Friday, October 16, 2015 1:22 PM
To: Chandra Mohan, Ananda Vel Murugan
Cc: user
Subject: Re: Get the previous state string in Spark streaming

Its hard to help without any stacktrace associated with UnsupportedOperationException.

On Thu, Oct 15, 2015 at 10:40 PM, Chandra Mohan, Ananda Vel Murugan <Ananda.Murugan@honeywell.com<mailto:Ananda.Murugan@honeywell.com>>
wrote:

One of my co-worker(Yogesh) was trying to get this posted in spark mailing and it seems it
did not get posted. So I am reposting it here. Please help.





Hi,

I am new to Spark and was trying to do some experiments with it.



I had a JavaPairDStream<String, List<String>> RDD.

I want to get the list of string from its previous state. For that I use updateStateByKey
function as follows:



final Function2<List<String>, Optional<List<String>>, Optional<List<String>>>
updateFunc =

       new Function2<List<String>, Optional<List<String>>,

Optional<List<String>>>() {



        public Optional<List<String>> call(List<String> arg0, Optional<List<String>>
arg1) throws Exception {

        // TODO Auto-generated method stub

            if(arg1.toString()==null)

               return Optional.of(arg0);

            else {

               arg0.add(arg1.toString());

               return Optional.of(arg0);

            }

       }

};



I want the function to append the new list of string to the previous list and return the new
list. But I am not able to do so. I am getting the C error.

Can anyone which help me out in getting the desired output?



Mime
View raw message