flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sathi Chowdhury <Sathi.Chowdh...@elliemae.com>
Subject Re: put record to kinesis and then trying consume using flink connector
Date Tue, 25 Apr 2017 22:56:26 GMT
Hi Gordon,
That was a typo, as I was trying to mask off the stream name.. I still had issues with using
Latest as the initial stream position , I moved to using AT_TIMESTAMP to solve it, it works
fine now.
Thanks so much for your response.
Sathi

From: "Tzu-Li (Gordon) Tai" <tzulitai@apache.org>
Date: Sunday, April 23, 2017 at 3:32 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: put record to kinesis and then trying consume using flink connector

Hi Sathi,

Here, in the producer-side log, it says:
2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of
bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence
number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream

The stream the record was inserted into is “mystream”.

However,

            DataStream<String> outputStream = see.addSource(new FlinkKinesisConsumer<>("myStream",
new MyDeserializerSchema(), consumerConfig));

you seem to be consuming from “myStream”.
Could the capital “S” be the issue?

Cheers,
Gordon



On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury (sathi.chowdhury@elliemae.com<mailto:sathi.chowdhury@elliemae.com>)
wrote:
Hi Flink Dev,
I thought something will work easily with flink and it is simple enough ,yer I am struggling
to make it work.
I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

Basically I am trying to bootstrap a stream with one event pushed into it as a warmup inside
flink job’s main method and I use aws kinesis client to simply putrecord into a given stream.
My expectation is that now if I addSource to a kinesis stream the data stream will consume
the event I pushed.



//This is the method that pushes to the kinesis Stream “mystream”
                    publishToKinesis(“mystream”,regionName,data) ;



                    Properties consumerConfig = new Properties();
        consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        consumerConfig.setProperty(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.LATEST.toString());
        consumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

        final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new Configuration(),
false);
            cluster.start();
            ObjectMapper mapper = new ObjectMapper();
            final StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment(
                    "localhost", cluster.getLeaderRPCPort());

            DataStream<String> outputStream = see.addSource(new FlinkKinesisConsumer<>("myStream",
new MyDeserializerSchema(), consumerConfig));


            for (Iterator<String> it = DataStreamUtils.collect(outputStream); it.hasNext();
) {
                String actualOut = it.next();
                ObjectNode actualOutNode = (ObjectNode) mapper.readTree(actualOut);

                    //then I do want to  either print it or do some further validation etc.
       }


……..

Not sure why the record that I published , FlinkKinesisConsumer is not able to react to it,
it keeps waiting for it…at the step it.next();


I print out the SequenceNumber I put the record at
2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published record, of
bytes :162810 partition key :fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: shardId-000000000000Sequence
number49572539577762637793132019619873654976833479400677703682 Stream Name:mystream


And Flink job is logging this at the end where it is waiting
2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0
will be seeded with initial shard KinesisStreamShard{streamName='mystream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'}, starting
state set as sequence number LATEST_SEQUENCE_NUM
2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: Subtask 0
will start consuming seeded shard KinesisStreamShard{streamName=’mystream’, shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber: 49572531519165352852103352736022695959324427654906511362,}}'} from
sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0

Any clue will be awesome to clear my confusion.
Thanks
Sathi
=============Notice to Recipient: This e-mail transmission, and any documents, files or previous
e-mail messages attached to it may contain information that is confidential or legally privileged,
and intended for the use of the individual or entity named above. If you are not the intended
recipient, or a person responsible for delivering it to the intended recipient, you are hereby
notified that you must not read this transmission and that any disclosure, copying, printing,
distribution or use of any of the information contained in or attached to this transmission
is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately
notify the sender by telephone or return e-mail and delete the original transmission and its
attachments without reading or saving in any manner. Thank you. =============
=============Notice to Recipient: This e-mail transmission, and any documents, files or previous
e-mail messages attached to it may contain information that is confidential or legally privileged,
and intended for the use of the individual or entity named above. If you are not the intended
recipient, or a person responsible for delivering it to the intended recipient, you are hereby
notified that you must not read this transmission and that any disclosure, copying, printing,
distribution or use of any of the information contained in or attached to this transmission
is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately
notify the sender by telephone or return e-mail and delete the original transmission and its
attachments without reading or saving in any manner. Thank you. =============
Mime
View raw message