incubator-s4-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leo Neumeyer <leoneume...@gmail.com>
Subject Re: Dynamic processEvent method overloading
Date Thu, 13 Oct 2011 21:57:05 GMT
OK. I pushed to master in my repo. You can do the merge now.

The relevant API methods look like this:

https://github.com/leoneu/s4-piper/blob/master/subprojects/s4-core/src/main/java/io/s4/core/ProcessingElement.java

    /* what used to be processInputEvent(). */
    abstract protected void onEvent(Event event); // gets called on every event.

    /* what used to be processOutputEvent(). */
    abstract public void onTrigger(Event event); // it is called based
on the triggers for the event type

    /**
     * This method is called by the PE timer. By default it is synchronized with
     * the {@link #onEvent()} and {@link #onTrigger()} methods. To execute
     * concurrently with other methods, the {@link ProcessingElelment} subclass
     * must be annotated with {@link @ThreadSafe}.
     *
     * Override this method to implement a periodic process.
     */
    void onTime() {}

     /**
     * This trigger is fired when the following conditions occur:
     *
     * <ul>
     * <li>An event of eventType arrived to the PE instance
     * <li>numEvents have arrived since the last time this trigger
     * was fired -OR- time since last event is greater than interval.
     * </ul>
     *
     * When the trigger fires, the method <tt>trigger(EventType event)</tt> is
     * called. Where <tt>EventType</tt> matches the argument eventType.
     *
     * @param eventType
     *            - the type of event on which this trigger will fire.
     * @param numEvents
     *            - number of events since last trigger activation. Must be
     *            greater than zero. (Set to one to trigger on every input
     *            event.)
     * @param interval
     *            - minimum time between triggers.
     * @param timeUnit
     *            - the TimeUnit for the argument interval.
     */
    public void setTrigger(Class<? extends Event> eventType, int numEvents,
            long interval, TimeUnit timeUnit) {}

    /**
     * Set a timer that calls {@link #onTime()}.
     *
     * If {@code interval==0} the timer is disabled.
     *
     * @param interval
     *            in timeUnit
     * @param timeUnit
     *            the timeUnit of interval
     */
    public void setTimerInterval(long interval, TimeUnit timeUnit) {}



On Thu, Oct 13, 2011 at 9:41 AM, Leo Neumeyer <leoneumeyer@gmail.com> wrote:
> After some skype we converged to the following:
>
> We support synchronous and asynchronous method triggers. The
> synchronous trigger is activated when an input event arrives and the
> asynchronous one is triggered by a timer.
>
> Proposed solution:
>
> * Synchronous methods:
>
> Signature: processEvent(SomeEvent event)
> Default behavior: trigger on every event
> Configuration: configure triggers by Event type, synchronized with all
> processEvent() and periodicTask() methods unless PE is annotated with
> @ThreadSafe
>
> * Asynchronous method:
>
> Signature: periodicTask()
> Default behavior: no trigger
> Configuration: interval in time unit, synchronized with all
> processEvent() methods unless PE is annotated with @ThreadSafe
>
> -leo
>
>
> On Thu, Oct 13, 2011 at 7:21 AM, Matthieu Morel
> <matthieu.morel@gmail.com> wrote:
>> On Wed, Oct 12, 2011 at 8:18 PM, Leo Neumeyer <leoneumeyer@gmail.com> wrote:
>>
>>>
>>> ...
>>>
>>> >
>>> >> A few thoughts:
>>> >>
>>> >> Input/Output names may not make a lot of sense any longer. What about
>>> >> changing to the following:
>>> >>
>>> >> processEvent(Event1 event) with any of the synchronous configurations.
>>> >>
>>> >> timerTask()  called when there is a timer task configured.
>>> >>
>>> >> The timer task needs to be synchronized with the processEvent method
>>> >> (unless the PE has a @ThreadSafe annotation), we can keep the internal
>>> dummy
>>> >> event to synchronize but call timerTask() without an Event instead.
I'm
>>> sure
>>> >> there are other ways of doing this.
>>> >>
>>> >> Does this make sense? We need to think a bit.
>>> >>
>>> >
>>> > Indeed, we lose some consistency in the API by handling output triggers
>>> in 2
>>> > different methods, depending on the trigger (event count based or timer
>>> > based)
>>> >
>>> > The original design (as described in the S4 paper) uses a "processEvent"
>>> > method for input events and an "output" method for triggered outputs. Why
>>> > not just keep that design? For the dynamic overload dispatcher, that
>>> would
>>> > just mean removing the code for "processOutputEvent".
>>> >
>>>
>>> Not sure I understand what you propose. I think we are saying the same:
>>>
>>> the processEvent() method can be configured to be triggered:
>>>
>>> - on every matching input event
>>> - on every N matching input events
>>> - on input event if time since last event is greater than delta
>>>
>>
>> This is a part which is not clear to me. My understanding was that the
>> output method might be triggered at periodic intervals, not the input
>> processing method. You seem to want to mix both methods, which is a bit
>> confusing to me. But it is also possible.
>>
>>
>>
>>>
>>> We still need another method for periodic tasks, maybe a good name is
>>> periodicTask()
>>>
>>
>> is it a periodic task that we need (and we may), or is it a periodically
>> invokable output method?
>>
>>
>>
>>> This method is synchronized with processEvent() (unless PS is
>>> @ThreadSafe) and it doesn't need to pass an event (because it is
>>> asynchronous).
>>>
>>> So it's not just removing processOutputEvent(), we need an additional
>>> method for periodic tasks that is synchronized with processEvent().
>>>
>>
>> That's the "output" method in the S4 paper, no?
>>
>>
>>
>> Matthieu
>>
>
>
>
> --
>
> -leo
>



-- 

-leo

Mime
View raw message