flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "P. Ramanjaneya Reddy" <ramanji...@gmail.com>
Subject Issue adding new IO(InfluxDB) with coder in Apache flink
Date Fri, 30 Jun 2017 05:29:57 GMT
Hi  Dev,

We have developed our own sdk io  functions for read/write InfluxDBIO
operations in apache BEAM.  it is works with default coder, which is
StringUtf8Coder.of().

 PCollection<String> output = pipeline.apply(
            InfluxDbIO.<String>read()
                    .withUri("http://localhost:8086")
                    .withDatabase("beam"));




With reference mongoDB and JDBC, implemented the read function with
setcoder() options in InfluxDB also, but it is not working.

    PCollection<String> output = pipeline.apply(
            InfluxDbIO.<String>read()
                    .withParser(new InfluxDbIO.Parser<String>() {
                      @Override
                      public void parse(String input,
                                      InfluxDbIO.ParserCallback<String>
callback) throws IOException {
                        callback.output(input);
                      }
                    })
                    .withUri("http://localhost:8086")
                    .withDatabase("beam")
                    .withCoder(StringUtf8Coder.of()));----> with coder
getting error as
Hi Beam Dev,

We have developed our own sdk io  functions for read/write InfluxDBIO
operations in apache BEAM.  it is works with default coder, which is
StringUtf8Coder.of().

 PCollection<String> output = pipeline.apply(
            InfluxDbIO.<String>read()
                    .withUri("http://localhost:8086")
                    .withDatabase("beam"));




With reference mongoDB and JDBC, implemented the read function with
setcoder() options in InfluxDB also, but it is not working.

    PCollection<String> output = pipeline.apply(
            InfluxDbIO.<String>read()
                    .withParser(new InfluxDbIO.Parser<String>() {
                      @Override
                      public void parse(String input,
                                     InfluxDbIO.ParserCallback<String>
callback) throws IOException {
                        callback.output(input);
                      }
                    })
                    .withUri("http://localhost:8086")
                    .withDatabase("beam")
                    .withCoder(StringUtf8Coder.of()));----> with coder
getting error as

java.lang.ClassCastException: org.apache.beam.sdk.values.PBegin cannot be
cast to org.apache.beam.sdk.values.PCollection

Thanks & Regards,
Ramanjaneya
java.lang.ClassCastException: org.apache.beam.sdk.values.PBegin cannot be
cast to org.apache.beam.sdk.values.PCollection

Thanks & Regards,
Ramanjaneya

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message