flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Flink not reading from Kafka
Date Thu, 23 Feb 2017 20:11:03 GMT
Hi Debashish,

did you execute Flink in a distributed setting? print() will output the
stream contents on stdout on the respective worker node (taskmanager), not
on the machine that submitted the job.

On Thu, Feb 23, 2017 at 5:41 PM, Debasish Ghosh <ghosh.debasish@gmail.com>
wrote:

> I was facing a similar problem yesterday. In my case print() was not
> working. Try adding a Sink and write the output to another Kafka topic.
> Something like https://github.com/apache/flink/blob/master/flink-
> examples/flink-examples-streaming/src/main/java/org/
> apache/flink/streaming/examples/kafka/WriteIntoKafka.java#L71 ..
>
> It worked for me. Is the stdout disabled somehow by default ?
>
> regards.
>
> On Thu, Feb 23, 2017 at 9:42 PM, Robert Metzger <rmetzger@apache.org>
> wrote:
>
>> Hi Mohit,
>>
>> is there new data being produced into the topic?
>> The properties.setProperty("auto.offset.reset", "earliest"); setting
>> only applies if you haven't consumed anything in this consumer group.
>> So if you have read all the data in the topic before, you won't see
>> anything new showing up.
>>
>> On Sat, Feb 18, 2017 at 2:09 AM, Mohit Anchlia <mohitanchlia@gmail.com>
>> wrote:
>>
>>> Interestingly enough same job runs ok on Linux but not on windows
>>>
>>> On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia <mohitanchlia@gmail.com>
>>> wrote:
>>>
>>>> I have this code trying to read from a topic however the flink process
>>>> comes up and waits forever even though there is data in the topic. Not sure
>>>> why? Has anyone else seen this problem?
>>>>
>>>> StreamExecutionEnvironment env = StreamExecutionEnvironment
>>>>
>>>> .*createLocalEnvironment*();
>>>>
>>>> Properties properties = *new* Properties();
>>>>
>>>> properties.setProperty("bootstrap.servers", "xxxx:9092");
>>>>
>>>> properties.setProperty("group.id", "test1");
>>>>
>>>> properties.setProperty("auto.offset.reset", "earliest");
>>>>
>>>> FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper
=
>>>> //something
>>>>
>>>>
>>>>
>>>> DataStream<String> stream = env
>>>>
>>>> .addSource(*new* FlinkKafkaConsumer010<>("test", *new*
>>>> SimpleStringSchema(), properties));
>>>>
>>>> stream.map(s -> Integer.*valueOf*(s)).flatMap(flatMapper).returns(
>>>>
>>>> *new* TypeHint<Tuple2<Integer, Integer>>() {
>>>>
>>>> }).print();
>>>>
>>>> JobExecutionResult *res* = env.execute();
>>>>
>>>>
>>>>
>>>> 02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map ->
Sink:
>>>> Unnamed(4/4) switched to RUNNING
>>>>
>>>>
>>>> 02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map ->
Sink:
>>>> Unnamed(1/4) switched to RUNNING
>>>>
>>>>
>>>> 02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map ->
Sink:
>>>> Unnamed(2/4) switched to RUNNING
>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Mime
View raw message