flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Better way to read several stream sources
Date Mon, 23 Jan 2017 18:40:11 GMT
Why don't you define one source and make it parallel?

You can implement "RichParallelSourceFunction" and use that to check which
parallel subtask the respective source is during execution.

On Mon, Jan 23, 2017 at 6:55 PM, Sendoh <unicorn.banachi@gmail.com> wrote:

> Hi Flink users,
>
> Can I ask is what would be the better way to read multiple stream sources?
>
> I have a FooSource which implements SourceFunction and reads one source,
> and
> would like to read several FooSource. FooSource basically reads data as
> stream by http call.
>
> Option1:
> Use a for-loop to read several data streams and union them.
>
> It looks like:
> List<DataStream&lt;JSONObject>> streams = new ArrayList<>();
>
> Iterator<String> sourceIter = sourceList.iterator();
>
>         while (sourceIter.hasNext()){
>
>             String source = sourceIter.next();
>
>             streams.add(env.addSource(new FooSource<>(source,
>                     new JSONSchema(), properties)).rebalance());
>
>         }
>
>         Iterator<DataStream&lt;JSONObject>> streamsIt =
> streams.iterator();
>
>         DataStream<JSONObject> currentStream = streamsIt.next();
>         while(streamsIt.hasNext()){
>             DataStream<JSONObject> nextStream = streamsIt.next();
>             currentStream = currentStream.union(nextStream);
>         }
>
> Option2:
> Implement SourceFunction and reads many FooSource.
>
> The implementation in FooSources looks like:
>
> @Override
>     public void open(Configuration parameters) throws Exception {
>         fooSourceList = new ArrayList<>();
>         LOG.info("Opened");
>         for(String sourceName: sourceNames) {
>             fooSource FooSource = new FooSource(properties, sourceName);
>             fooSource.open(parameters);
>             fooSourceList.add(fooSource);
>             LOG.info("Read source: " + sourceName);
>         }
>     }
>
>     @Override
>     public void run(final SourceContext<String> ctx) throws Exception {
>         LOG.info("Processing");
>         // It won't work, however, a parallel for-loop is fine for
> performance concern?
>         for(FooSource fooSource: fooSourceList) {
>             fooSource.run(ctx);
>         }
>     }
>
> Best,
>
> Sendoh
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Better-way-to-
> read-several-stream-sources-tp11224.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message