flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steffen Wohlers <steffenwohl...@gmx.de>
Subject Re: data enrichment via endpoint, serializable issue
Date Thu, 19 Jul 2018 12:39:42 GMT
Hi Xingcan,

option two RichMapFunction works , thanks a lot!


Thanks,
Steffen

> On 19. Jul 2018, at 13:59, Xingcan Cui <xingcanc@gmail.com> wrote:
> 
> Hi Steffen,
> 
> You could make the class `TextAPIClient` serializable, or use `RichMapFunction` [1] and
instantiate all the required objects in its `open()` method.
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#rich-functions
<https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html#rich-functions>
> 
> Best,
> Xingcan
> 
>> On Jul 19, 2018, at 6:56 PM, Steffen Wohlers <steffenwohlers@gmx.de <mailto:steffenwohlers@gmx.de>>
wrote:
>> 
>> Hi all,
>> 
>> I’m new to Apache Flink and I have the following issue:
>> 
>> I would like to enrich data via map function. For that I call a method which calls
an endpoint but I get following error message 
>> 
>> „The implementation of the MapFunction is not serializable. The object probably
contains or references non serializable fields.
>> 	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)“ …
>> „Caused by: java.io.NotSerializableException: com.aylien.textapi.TextAPIClient“
>> 
>> Is there a smart way to fix that issue?
>> 
>> Regards,
>> 
>> Steffen
>> 
>> 
>> Map Function:
>> DataStream<TweetSentiment> tweetSentimentDataStream = noRTDataStream
>>         .map(new MapFunction<Tweet, TweetSentiment>() {
>>             @Override
>>             public TweetSentiment map(Tweet tweet) throws Exception {
>>                 String polarity = "good";
>>                 polarity = test.testMethod();
>>                 polarity =  sentimentAnalysis.sentiment(tweet.getText());
>>                 return new TweetSentiment(tweet, polarity, 0);
>>             }
>>         });
>> 
>> Class:
>> 
>> public class SentimentAnalysis implements Serializable {
>> 
>>     private TextAPIClient _sentimentClient;
>> 
>>     public SentimentAnalysis () {
>>         _sentimentClient = new TextAPIClient(„xxx", „xxx");
>>     }
>> 
>>     public String sentiment(String text)  throws Exception{
>>         SentimentParams sentimentParams = new SentimentParams(text, null, null);
>>         Sentiment sentiment = _sentimentClient.sentiment(sentimentParams);
>> 
>>         return sentiment.getPolarity();
>>     }
>> (Analysis via Aylien)
> 


Mime
View raw message