flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Getting two types of events from a Window (Trigger)?
Date Fri, 11 Dec 2015 12:59:33 GMT
Hi Niels,
I’m afraid this will not work. (If I understood correctly what you are trying to do.) When
the trigger is being serialized/deserialized each parallel instance of the trigger has their
own copy of the QueueSource object. Plus, a separate instance of the QueueSource itself will
be running in each parallel instance of the source operator. And there is no way for there
being communication between the trigger and source, since they might now even run on the same
machine in the end.

Cheers,
Aljoscha
> On 11 Dec 2015, at 13:11, Niels Basjes <Niels@basjes.nl> wrote:
> 
> Hi,
> 
> Just to let you know: I tried passing a SourceFunction but I haven't been able to get
that to work (yet).
> 
> I passed an instance of this (see code below) into my Trigger and stored it there as:
>     private QueueSource output;
> and then I called from the onElement something like:
>    output.put("Foo",1234);
> 
> When I run this from my IDE I get two distinct instances of the queue (effect: the stuff
I put in doesn't come out at the other end).
> 
> Any pointers how (and if) this can be fixed are welcome.
> Only if this works will I look into making this a generic (I got some type related exceptions
when I tried that).
> 
> Niels
> 
> 
> (Below has Apache 2.0 License; so copy adapt and improve if you want to)
> package nl.basjes.flink.experiments;
> 
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
> 
> import java.util.concurrent.ConcurrentLinkedQueue;
> 
> public class QueueSource extends RichEventTimeSourceFunction<String> {
>     private static final long serialVersionUID = 1L;
> 
>     private volatile boolean isRunning = true;
> 
>     private ConcurrentLinkedQueue<QueueElement> queue = new ConcurrentLinkedQueue<>();
> 
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>     }
> 
>     @Override
>     public void close() throws Exception {
>         super.close();
>     }
> 
>     @Override
>     public void run(SourceContext<String> ctx) throws Exception {
>         this.isRunning = true;
> 
>         while (this.isRunning) {
>             if (queue.isEmpty()) {
>                 Thread.sleep(1); // Sleep 1 ms before retrying to dequeue again
>                 continue;
>             }
>             QueueElement queueElement = queue.poll();
>             ctx.collectWithTimestamp(queueElement.element, queueElement.timestamp);
>         }
>     }
> 
>     public void cancel() {
>         this.isRunning = false;
>     }
> 
>     public void put(String element, long timestamp) {
>         QueueElement queueElement = new QueueElement();
>         queueElement.element = element;
>         queueElement.timestamp = timestamp;
>         queue.add(queueElement);
>     }
> }
> 
> class QueueElement {
>     String element;
>     long timestamp;
> }
> 
> 
> 
> On Fri, Dec 11, 2015 at 11:07 AM, Niels Basjes <Niels@basjes.nl> wrote:
> Thanks.
> 
> The way I solved it now is by creating a class that persists data into something external
(right now HBase and/or Kafka) and use that from the trigger to output the data.
> 
> I have two followup questions:
> 1) Is it possible to pass an instance of  'SourceFunction' as such a parameter (without
breaking Flink)?
> 2) I want to save resources so I'm using a single instance of my 'Extra data output class'
in the instance of the Trigger. Thus reusing the connections to the outside over multiple
Window instances. Can I assume that a single instance of Trigger will only be used by a single
thread? I.e. Can I assume that I do not need locking and synchronization?
> 
> Niels
> 
> On Thu, Dec 10, 2015 at 4:14 PM, Stephan Ewen <sewen@apache.org> wrote:
> Hi Niels!
> 
> I think there is no clean way to emit data from a trigger right now, you can only emit
data from the window functions.
> 
> You can emit two different kind of data types using an "Either" type. This is built-in
in Scala, in Java we added it on 1.0-SNAPSHOT:
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java
> 
> Maybe being able to emit different type of elements helps your use case...
> 
> 
> These types of questions have been coming up quite a bit, people looking to do different
actions inside the windows on different triggers (on element, on event time).
> 
> As per discussion with Aljoscha, one way to make this more flexible is to enhance what
you can do with custom state:
>   - State has timeouts (for cleanup)
>   - Functions allow you to schedule event-time progress notifications
> 
> Stephan
> 
> 
> 
> On Thu, Dec 10, 2015 at 11:55 AM, Niels Basjes <Niels@basjes.nl> wrote:
> Hi,
> 
> I'm working on something that uses the Flink Window feature.
> I have written a custom Trigger to build the Window I need.
> 
> I am using the Window feature because I need state and I need to expire (and clean) this
state after a timeout (I use the onEventTime to do that).
> Because I need the data streaming in real time (augmented with the mentioned state) I
'FIRE' after every event. Just before I 'PURGE' the window I need the fact of this purge (and
some of the stats of this Window) as a separate event in a separate 'DataStream'.
> 
> Now the interfaces of the various classes only support output as a single java type (very
sane choice).
> So what I do right now is put my events on something 'external' (HBase/Kafka) and read
it in via a different Source implementation.
> 
> My question: Is there a better way to do this?
> Can I (for example) create a special 'Source' that I can pass as a parameter to my Trigger
and then onEventTime just output a 'new event' ?
> 
> What do you recommend?
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes


Mime
View raw message