streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthew Hager [W2O Digital]" <>
Subject Re: Proposing Changes to the StreamsProvider interface and StreamsProviderTask
Date Wed, 14 May 2014 20:12:13 GMT

As always thanks for your feedback and mentorship as I work to contribute
to this project.

I feel the current pattern for processor is extremely limiting given the
constraint of the return statement rather than the alternative of receive
-> process -> write. It seems that if we revise the current interfaces we
could handle many more use-cases with streams that cannot currently be

Current "StreamsProcessor":

-- Contract --
List<StreamsDatum> process(StreamsDatum datum);

-- Drawbacks --
For the processor to access the 'writer' he must have an item to be
processed. If he doesn't have an item, then he can't write anything

-- Use Cases Not Satisfied --
In many cases the absence of data on a 'stream' is an event in itself. For
instance, if I want to have a listener at various portions of the process
to ensure my processing data providers are working appropriately. If I
want to send out a message, log an event to Oracle, and alert PagerDuty,
for stale data. I can write a quick processor that checks for stale data,
then upon recognition of stale data (timer expiring), fire the proper
events to it's writers.

Other use cases involve... debugging, stream summarization metrics,
trending algorithms, processing  stats, and many others.

+++ Proposed Interfaces +++

// Analogous to Provider
public interface Provider {

// Analogous to Writer
public interface Listener {

// Analogous to Processor
public interface Processor implements Prover, Listener {


+++ Conclusion +++
I feel the event driven model is more flexible than the current model. We
can write an interface for the legacy 'contract' to ease implementation /


interface LegacyProcessor {
	List<StreamsDatum> process(StreamsDatum datum);


class LegacyProcessorImpl {

	listen(StreamsDatum datum) {?
		forEach(StreamsDatum d : this.transform(datum) {


I really love streams, I think the concept is great, I think adding this
would take streams from "great" --> "OMG, the whole world should use this
all the time for everything ever, it is flipping amazing and better than
ice cream on a hot sunny day."



On 5/13/14, 2:38 PM, "Matt Franklin" <> wrote:

>On Tue, May 6, 2014 at 10:53 PM, Matthew Hager [W2O Digital] <
>> wrote:
><snip />
>> StreamsResultSet - I actually found this to be quite useful paradigm. A
>> queue prevents a buffer overflow, an iterator makes it fun and easy to
>> read (I love iterators), and it is simple and succinct. I do, however,
>> feel it is best expressed as an interface instead of a class.
>I agree with an interface.  IMO, anything that is not a utility style
>helper should be interacted with via its interface.
>> The thing missing from this, as an interface, would be the notion of
>> "isRunning" which could easily
>> satisfy both of the aforementioned modalities.
>> Event Driven - I concur with Matt 100% on this. As currently
>> LocalStreamsBuilder is exceedingly inefficient from a memory perspective
>> and time execution perspective. To me, it seems, that we could almost
>> abstract out 2 common interfaces to make this happen.
>>         * Listener { receive(StreamsDatum); }
>>         * Producer { push(StreamsDatum); registerListener(Listener); }
>> Where the following implementations would place:
>>         * Reader implements Producer
>>         * Processor implements Producer, Listener
>>         * Writer implements Listener
>Seems logical.  I would like to see the two possible operating modes
>represented as distinct interfaces.

View raw message