flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Foster, Craig" <foscr...@amazon.com>
Subject Re: Wikiedit QuickStart with Kinesis
Date Thu, 01 Sep 2016 14:43:17 GMT
Thanks Gordon. I think I changed all my versions to match the version to which I built Kinesis
connector, so you were right. That seems to have moved me further. I can write to streams
now. Now all I need to do is figure out how Kinesis is encoding it. :)

One issue with the "AUTO" option is that whatever credentials it finds, it doesn't seem to
have PutRecords permissions even though the AWS IAM role I am using ostensibly has that...so
I am back to having credentials in code which isn't necessarily a best practice. I haven't
figured that part out yet either.

From: "Tzu-Li (Gordon) Tai" <tzulitai@apache.org>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Thursday, September 1, 2016 at 2:25 AM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Wikiedit QuickStart with Kinesis

Hi Craig,

I’ve just run a simple test on this and there should be no problem.

What Flink version were you using (the archetype version used with the Flink Quickstart Maven
Archetype)?
Also, on which branch / commit was the Kinesis connector built? Seeing that you’ve used
the “AUTO”
credentials provider option, I’m assuming it’s built on the master branch and not a release
branch (the “AUTO”
option wasn’t included in any of the release branches yet).

So I’m suspecting it’s due to a version conflict between the two. If yes, you should build
the Kinesis connector
with the same release version branch as the Flink version you’re using.
Could you check and see if the problem remains? Thanks!

Regards,
Gordon



On September 1, 2016 at 1:34:19 AM, Foster, Craig (foscraig@amazon.com<mailto:foscraig@amazon.com>)
wrote:
Hi:
I am using the following WikiEdit example:
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/run_example_quickstart.html

It works when printing the contents to a file or stdout.

But I wanted to modify it to use Kinesis instead of Kafka. So instead of the Kafka part, I
put:

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(),
producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("my-flink-stream");
kinesis.setDefaultPartition("0");

result
.map(new MapFunction<Tuple2<String,Long>, String>() {
    @Override
    public String map(Tuple2<String, Long> tuple) {
        return tuple.toString();
    }
})
.addSink(kinesis);

see.execute();


But I get the following error:

2016-08-31 17:05:41,541 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - Source: Custom Source (1/1) (2f7d339588fec18e0f2617439ee9be6d) switched from RUNNING to
CANCELING

2016-08-31 17:05:41,542 INFO  org.apache.flink.yarn.YarnJobManager                       
  - Status of job 43a13707d92da260827f37968597c187 () changed to FAILING.

java.lang.Exception: Serialized representation of org.apache.flink.streaming.runtime.tasks.TimerException:
java.lang.RuntimeException: Could not forward element to next operator

        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:803)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)
Searching Google doesn't yield many things that seem to work. Is there somewhere I should
look for a root cause? I looked in the full log file but it's not much more than this stacktrace.
Mime
View raw message