flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hussein Baghdadi <hussein.baghd...@zalando.de>
Subject How to write a Sink that needs to deal with conditions in thread-safe way?
Date Thu, 02 Mar 2017 15:53:22 GMT
Hello, 

I have some basic questions regarding Sinks in Flink.

1) To implement our own Sink, which class to implement: RichSinkFunction, RichOutputFormat,
etc ..

2) We are trying to write batches in our Sink. For that, in overrided invoke() , we are calling
a synchronised function:

// events = new ConcurrentLinkedQueue<>();

   @Override
   public void invoke(T value) throws Exception {
       this.addEventList(value);
   }

   private synchronized void addEventList(T event) {
       events.add(event);
       if ((new Date()).getTime() >= this.nextFlush.get() || events.size() > this.maxBatchSize)
{
           Response response = null;
           try {
               response = nakadiClient.resources().events().send(eventName, events);
           } catch (final ClientException | IllegalStateException e) {
               logger.error("Error while sending to Nakadi. Error: {}", e.getMessage());
               throw new RuntimeException(e);
           } finally {
               if (response != null) {
                   try {
                       response.responseBody().close();
                       events = new ConcurrentLinkedQueue<>();
                       this.nextFlush.set((new Date()).getTime() + this.millisecondsWait);
                   } catch (final Exception ex) {
                       logger.error("Error happened while trying to close response. {}", ex);
                   }
               }
           }
       }
   }
Mime
View raw message