flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: Custom Trigger Implementation
Date Mon, 25 Apr 2016 14:19:30 GMT
Good to hear that!

Kostas

> On Apr 25, 2016, at 12:24 PM, Piyush Shrivastava <piyushjoy@yahoo.co.in> wrote:
> 
> Thanks a lot Kostas. This solved my problem.
>  
> Thanks and Regards,
> Piyush Shrivastava <mailto:piyush@webograffiti.com>
> 
> http://webograffiti.com <http://webograffiti.com/>
> 
> 
> On Monday, 25 April 2016 3:27 PM, Kostas Kloudas <k.kloudas@data-artisans.com>
wrote:
> 
> 
> Hi,
> 
> Let me also add that you should also override the clear() method in order to clear you
state.
> and delete the pending timers.
> 
> Kostas
> 
>> On Apr 25, 2016, at 11:52 AM, Kostas Kloudas <k.kloudas@data-artisans.com <mailto:k.kloudas@data-artisans.com>>
wrote:
>> 
> 
> Hi Piyush,
> 
> In the onElement function, you register a timer every time you receive an element. 
> 
> When the next watermark arrives, in the flag==false case, this will lead to every element

> adding a timer for its timestamp+60000ms. The same for flag==true case, with 20000ms
interval.
> 
> What you can try is to set only once, at the first element the initial trigger for 60
sec, and then 
> just set all the rest in the on the onEventTime with 20 sec.
> 
> To have a look at a custom trigger you can look at here:
> https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
<https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java>
> 
> I hope this helped.
> Let me know if you need any help.
> 
> Kostas
> 
>> On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava <piyushjoy@yahoo.co.in <mailto:piyushjoy@yahoo.co.in>>
wrote:
>> 
>> Hi all,
>> I want to implement a custom Trigger which fired a GlobalWindow in 1 minute for the
first time and every 20 seconds after that.
>> I believe I cannot get this logic right in the implementation of my custom Trigger.
Please help me with this.
>> 
>> Here is the code of my custom Trigger:
>> 
>> public class TradeTrigger<W extends Window> extends Trigger<Object, W>
{
>> 
>>     /**
>>      * 
>>      */
>>     private static final long serialVersionUID = 1L;
>>       
>>     private TradeTrigger() {
>>     }
>>     
>>     @Override
>>     public TriggerResult onElement(
>>             Object element,
>>             long timestamp,
>>             W window,
>>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx)
>>             throws Exception {
>>         
>>         ctx.registerEventTimeTimer(timestamp);
>>         return TriggerResult.CONTINUE;
>>         
>>     }
>> 
>>     @Override
>>     public TriggerResult onEventTime(
>>             long timestamp,
>>             W window,
>>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx)
>>             throws Exception {
>>     
>>     ValueState<Boolean> state = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("flag",
Boolean.TYPE, false));
>>         
>>         if(state.value()==false){
>>             ctx.registerEventTimeTimer(timestamp+60000);
>>             state.update(true);
>>             return TriggerResult.FIRE;
>>         }else{
>>             System.out.println(""+state.value());
>>             ctx.registerEventTimeTimer(timestamp+20000);
>>             return TriggerResult.FIRE;
>>         }
>>     }
>> 
>>     @Override
>>     public TriggerResult onProcessingTime(
>>             long arg0,
>>             W arg1,
>>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
arg2)
>>             throws Exception {
>>         // TODO Auto-generated method stub
>>         return TriggerResult.CONTINUE;
>>     }
>>     
>>      
>>     public static <W extends Window> TradeTrigger<W> of() {
>>         return new TradeTrigger<>();
>>     }
>> 
>>     
>> }
>>  
>> Thanks and Regards,
>> Piyush Shrivastava <mailto:piyush@webograffiti.com>
>> 
>> http://webograffiti.com <http://webograffiti.com/>
> 
> 
> 
> 


Mime
View raw message