streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Robert Douglas [W2O Digital]" <>
Subject Re: Proposing Changes to the StreamsProvider interface and StreamsProviderTask
Date Thu, 15 May 2014 16:53:10 GMT
Hi all,

After working with the Streams project a bit, I have noticed some of the
same issues that Matt and Ryan have brought up. I think that Matt's idea
to implement two interfaces (Producer, Listener) would make a great
addition to the project. Not only would it increase efficiency but it
would also, in my opinion, make the streams themselves easier to construct
and understand.

-- Robert

On 5/7/14, 1:41 PM, "Matthew Hager [W2O Digital]" <>

>Good Day!
>I would like to throw in my two pents in on this if it pleases the
>Here are my thoughts based on implementations that I have written with
>streams to ensure timely, high yield execution. Personally, I had to
>override much of the LocalStreamsBuilder to fit my use cases for many of
>the problems described below, except the opposite of which. I have a
>modality of a 'finite' stream which execution is hindered when being
>'polled' in the manner that it is. This is further complicated by the
>excessive waiting caused by the current 'shutdown' the exists.
>There are essentially two major use-cases, that I can see, that are likely
>to take place. The first is a perpetual stream, that is technically never
>satisfied. The second, is the case of a finite stream (HDFS reader, S3
>reader, pulling a user's time-line, etc...) that has a definitive start
>and end. To solve these two models of execution here are my thoughts.
>StreamsResultSet - I actually found this to be quite useful paradigm. A
>queue prevents a buffer overflow, an iterator makes it fun and easy to
>read (I love iterators), and it is simple and succinct. I do, however,
>feel it is best expressed as an interface instead of a class. Personally I
>had to override almost every function to fit the concept of a 'finite'
>stream. Without an expensive tear-down cost. The thing missing from this,
>as an interface, would be the notion of "isRunning" which could easily
>satisfy both of the aforementioned modalities. (As Ryan suggested) I
>actually have a reference implementation of this for finite streams if
>anyone would like to see it or use it.
>Event Driven - I concur with Matt 100% on this. As currently implemented,
>LocalStreamsBuilder is exceedingly inefficient from a memory perspective
>and time execution perspective. To me, it seems, that we could almost
>abstract out 2 common interfaces to make this happen.
>	* Listener { receive(StreamsDatum); }
>	* Producer { push(StreamsDatum); registerListener(Listener); }
>Where the following implementations would place:
>	* Reader implements Producer
>	* Processor implements Producer, Listener
>	* Writer implements Listener
>In the reference implementations, you can still have queues that are in
>place that could actually function as meaningful indicators of system
>performance and status. IE: the queue functions as, well, an actually
>queue, and processes are much more asynchronous than they currently are
>now. Then, LocalStreamsBuilder strings all the guys up together in their
>nice little workflows and the events just shoot the little Datums down
>their paths until they wind up wherever they are supposed to go as quickly
>as possible.
>Pardon the long response, I tend to be wordy, great discussion and thanks
>to everyone for indulging my thoughts!
>Smashew (Matthew Hager)
>Matthew Hager
>Director - Data Sciences Software
>W2O Digital
>E Cesar Chavez St., Suite 300, Austin, Texas 78702
>direct 512.551.0891 | cell 512.949.9603
>twitter iSmashew 
>rVGx8qNRO> | linkedin Matthew Hager
>On 5/6/14, 10:58 AM, "Steve Blackmon" <> wrote:
>>On Tue, May 6, 2014 at 8:24 AM, Matt Franklin <>
>>> On Mon, May 5, 2014 at 1:15 PM, Steve Blackmon <>
>>>> What I meant to say re #1 below is that batch-level metadata could be
>>>> useful for modules downstream of the StreamsProvider /
>>>> StreamsPersistReader, and the StreamsResultSet gives us a class to
>>>> which we can add new metadata in core as the project evolves, or
>>>> supplement on a per-module or per-implementation basis via
>>>> subclassing.  Within a provider there's no need to modify or extend
>>>> StreamsResultSet to maintain and utilize state from a third-party API.
>>> I agree that in batch mode, metadata might be important.  In
>>> with other people, I think what might be missing is a completely
>>> event-driven mode where a provider pushes to the rest of the stream
>>> than gets polled.
>>That would certainly be nice, but I see it as primarily a run-time
>>concern.  We should add additional methods to the core interfaces if
>>we need them to make a push run-time (backed by camel, nsq, activemq,
>>0mq, etc...) work, but let's stay vigilant to keep the number of
>>methods on those interfaces to a minimum so we don't end up with a)
>>classes that do a lot of stuff in core b) an effective partition
>>between methods necessary for perpetual and batch modes c) lots of
>>modules that implement just one or the other.  Modules that don't
>>implement all run-modes is already a problem.
>>So who wants to volunteer to write a push-based run-time module?
>>>> I think I would support making StreamsResultSet an interface rather
>>>> than a class.
>>> +1 on interface
>>>> Steve Blackmon
>>>> On Mon, May 5, 2014 at 12:07 PM, Steve Blackmon <>
>>>> wrote:
>>>> > Comments on this in-line below.
>>>> >
>>>> > On Thu, May 1, 2014 at 4:38 PM, Ryan Ebanks <>
>>>> wrote:
>>>> >> The use and implementations of the StreamsProviders seems to have
>>>> drifted
>>>> >> away from what it was originally designed for.  I recommend that
>>>> change
>>>> >> the StreamsProvider interface and StreamsProvider task to reflect
>>>> >> current usage patterns and to be more efficient.
>>>> >>
>>>> >> Current Problems:
>>>> >>
>>>> >> 1.) newPerpetualStream in LocalStream builder is not perpetual.
>>>> >> StreamProvider task will shut down after a certain amount of empty
>>>> returns
>>>> >> from the provider.  A perpetual stream implies that it will run
>>>> >> perpetuity.  If I open a Twitter Gardenhose that is returning
>>>> with
>>>> >> obscure key words, I don't want my stream shutting down if it is
>>>> quiet
>>>> >> for a few time periods.
>>>> >>
>>>> >> 2.) StreamsProviderTasks assumes that a single read*, will return
>>>> the
>>>> >> data for that request.  This means that if I do a readRange for
>>>> the
>>>> >> provider has to hold all of that data in memory and return it as
>>>> >> StreamsResultSet.  I believe the readPerpetual was designed to get
>>>> around
>>>> >> this problem.
>>>> >>
>>>> >> Proposed Fixes/Changes:
>>>> >>
>>>> >> Fix 1.) Remove the StreamsResultSet.  No implementations in the
>>>> >> currently use it for anything other than a wrapper around a Queue
>>>> is
>>>> >> then iterated over.  StreamsProvider will now return a
>>>> Queue<StreamsDatum>
>>>> >> instead of a StreamsResultSet.  This will allow providers to queue
>>>> as
>>>> >> they receive it, and the StreamsProviderTask can pop them off as
>>>>soon as
>>>> >> they are available.  It will help fix problem #2, as well as help
>>>> lower
>>>> >> memory usage.
>>>> >>
>>>> >
>>>> > I'm not convinced this is a good idea.  StreamsResultSet is a useful
>>>> > abstraction even if no modules are using it as more than a wrapper
>>>> > Queue at the moment.  For example read* in a provider or
>>>> > could return batch-level (as opposed to datum-level) metadata from
>>>> > underlying API which would be useful state for the provider.
>>>> > Switching to Queue would eliminate our ability to add those
>>>> > capabilities at the core level or at the module level.
>>>> >
>>>> >> Fix 2.) Add a method, public boolean isRunning(), to the
>>>> >> interface.  The StreamsProviderTask can call this function to see
>>>>if the
>>>> >> provider is still operating. This will help fix problems #1 and
>>>> >> will allow the provider to run mulitthreaded, queue data as it's
>>>> available,
>>>> >> and notify the task when it's done so that it can be closed down
>>>> properly.
>>>> >>  It will also allow the stream to be run in perpetuity as the
>>>> >> won't shut down providers that have not been producing data for
>>>> >>
>>>> >
>>>> > I think this is a good idea.  +1
>>>> >
>>>> >> Right now the StreamsProvider and StreamsProviderTask seem to be
>>>>full of
>>>> >> short term fixes that need to be redesigned into long term
>>>>  With
>>>> >> enough positive feedback, I will create Jira tasks, a feature
>>>> and
>>>> >> begin work.
>>>> >>
>>>> >> Sincerely,
>>>> >> Ryan Ebanks

View raw message