flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christophe Salperwyck <christophe.salperw...@gmail.com>
Subject Re: HBase Input Format for streaming
Date Mon, 06 Jun 2016 15:21:29 GMT
I just did that:

public T nextRecord(final T reuse) throws IOException {
if (this.rs == null){
// throw new IOException("No table result scanner provided!");
return null;
}
...

because in the class FileSourceFunction we have:

 @Override public void run(SourceContext<OUT> ctx) throws Exception { while
(isRunning) { OUT nextElement = serializer.createInstance(); nextElement =
format.nextRecord(nextElement); if (nextElement == null &&
splitIterator.hasNext()) { format.open(splitIterator.next()); continue; }
else if (nextElement == null) { break; } ctx.collect(nextElement); } }

(I had to copy TableInputSplit as its constructor is not visible...)


2016-06-06 16:07 GMT+02:00 Ufuk Celebi <uce@apache.org>:

> From the code it looks like the open method of the TableInputFormat is
> never called. What are you doing differently in the
> StreamingTableInputFormat?
>
> – Ufuk
>
>
> On Mon, Jun 6, 2016 at 1:49 PM, Christophe Salperwyck
> <christophe.salperwyck@gmail.com> wrote:
> > Hi all,
> >
> > I am trying to read data from HBase and use the windows functions of
> Flink
> > streaming. I can read my data using the ExecutionEnvironment but not from
> > the StreamExecutionEnvironment.
> >
> > Is that a known issue?
> >
> > Are the inputsplits used in the streaming environment?
> >
> > Here a sample of my code:
> >
> > final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> > @SuppressWarnings("serial")
> > final DataStreamSource<ANA> anaDS = env.createInput(new
> > TableInputFormat<ANA>() {
> > ...
> > }
> >
> > final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS.
> > assignTimestampsAndWatermarks(new
> xxxxAssignerWithPunctuatedWatermarks()).
> > keyBy(0).
> > timeWindow(Time.days(30), Time.days(30));
> >
> > ws.sum(2).printToErr();
> > env.execute();
> >
> > The error I get is:
> > Caused by: java.io.IOException: No table result scanner provided!
> > at
> >
> org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103)
> >
> > It seems the "Result" is not read for a first time before calling this
> > function.
> >
> > I built a "StreamingTableInputFormat" as a temporary work around but let
> me
> > know if there is something I did wrong.
> >
> > Thanks for everything, Flink is great!
> >
> > Cheers,
> > Christophe
>

Mime
View raw message