flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Yao <g...@data-artisans.com>
Subject Re: all task managers reading from all kafka partitions
Date Sat, 18 Nov 2017 09:28:39 GMT
Hi Robert,

Running a single job does not mean that you are limited to a single JVM.

For example, a job with parallelism 4 by default requires 4 task slots to
run.
You can provision 4 single slot TaskMangers on different hosts to connect
to the
same JobManager. The JobManager can then take your job and distribute the
execution on the 4 slots. To learn more about the distributed runtime
environment:


https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/runtime.html

Regarding your concerns about job failures, a failure in the JobManager or
one
of the TaskManagers can bring your job down but Flink has built-in
fault-tolerance on different levels. You may want to read up on the
following
topics:

- Data Streaming Fault Tolerance:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html
- Restart Strategies:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/restart_strategies.html
- JobManager High Availability:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html

Let me know if you have further questions.

Best,

Gary

On Fri, Nov 17, 2017 at 11:11 PM, r. r. <robert@abv.bg> wrote:

> Hmm, but I want single slot task managers and multiple jobs so that if one
> job fails it doesn't bring the whole setup (for example 30+ parallel
> consumers) down.
> What setup would you advise? The job is quite heavy and might bring the VM
> down if run with such concurency in one JVM.
>
> Thanks!
>
>
>
>
>
>
>
>  >-------- Оригинално писмо --------
>
>  >От: Gary Yao gary@data-artisans.com
>
>  >Относно: Re: all task managers reading from all kafka partitions
>
>  >До: "r. r." <robert@abv.bg>
>
>  >Изпратено на: 17.11.2017 22:58
>
>
>
>
> >
>
> >
>
> >
>
> >
>
> >    Forgot to hit "reply all" in my last email.
>
> >
>
> >
>
> >
>
> >
>
> >      On Fri, Nov 17, 2017 at 8:26 PM, Gary Yao
>
> >      <gary@data-artisans.com> wrote:
>
> >
>
> >
>
> >
>
> >        Hi Robert,
>
> >
>
> >
>
> >
>
> >
>
> >         To get your desired behavior, you should start a single job with
> parallelism set to 4.
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >         Flink does not rely on Kafka's consumer groups to distribute the
> partitions to the parallel subtasks.
>
> >
>
> >
>
> >         Instead, Flink does the assignment of partitions itself and also
> tracks and checkpoints the offsets internally.
>
> >
>
> >
>
> >         This is needed to achieve exactly-once semantics.
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >         The
>
> >         group.id that you are setting is used for different purposes,
> e.g., to track the consumer lag of a job.
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >         Best,
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >         Gary
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >           On Fri, Nov 17, 2017 at 7:54 PM, r. r.
>
> >           <robert@abv.bg> wrote:
>
> >
>
> >
>
> >            Hi    it's Flink 1.3.2, Kafka 0.10.2.0  I am starting 1 JM
> and 4 TM (with 1 task slot each). Then I deploy 4 times (via ./flink run
> -p1 x.jar), job parallelism is set to 1.    A new thing I just noticed: if
> I start in parallel to the Flink jobs two  kafka-console-consumer (with
> --consumer-property group.id=TopicConsumers) and write a msg to Kafka,
> then one of the console consumers receives the msg together with both Flink
> jobs.  I though maybe the Flink consumers didn't receive the group property
> passed via "flink run .. --group.id TopicConsumers", but no - they do
> belong to the group as well:    taskmanager_3  | 2017-11-17 18:29:00,750
> INFO
>
> >             org.apache.kafka.clients.consumer.ConsumerConfig
> -
>
> >             ConsumerConfig values:
>
> >
>
> >
>
> >
>
> >               taskmanager_3  |
>
> >              auto.commit.interval.ms = 5000
>
> >
>
> >               taskmanager_3  |     auto.offset.reset = latest
>
> >
>
> >               taskmanager_3  |     bootstrap.servers = [kafka:9092]
>
> >
>
> >               taskmanager_3  |     check.crcs = true
>
> >
>
> >               taskmanager_3  |
>
> >              client.id =
>
> >
>
> >               taskmanager_3  |
>
> >              connections.max.idle.ms = 540000
>
> >
>
> >               taskmanager_3  |     enable.auto.commit = true
>
> >
>
> >               taskmanager_3  |     exclude.internal.topics = true
>
> >
>
> >               taskmanager_3  |     fetch.max.bytes = 52428800
>
> >
>
> >               taskmanager_3  |
>
> >              fetch.max.wait.ms = 500
>
> >
>
> >               taskmanager_3  |     fetch.min.bytes = 1
>
> >
>
> >               taskmanager_3  |
>
> >              group.id = TopicConsumers
>
> >
>
> >               taskmanager_3  |
>
> >              heartbeat.interval.ms = 3000
>
> >
>
> >               taskmanager_3  |     interceptor.classes = null
>
> >
>
> >               taskmanager_3  |     key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> >
>
> >               taskmanager_3  |     max.partition.fetch.bytes = 1048576
>
> >
>
> >               taskmanager_3  |
>
> >              max.poll.interval.ms = 300000
>
> >
>
> >               taskmanager_3  |     max.poll.records = 500
>
> >
>
> >               taskmanager_3  |
>
> >              metadata.max.age.ms = 300000
>
> >
>
> >               taskmanager_3  |     metric.reporters = []
>
> >
>
> >               taskmanager_3  |     metrics.num.samples = 2
>
> >
>
> >               taskmanager_3  |     metrics.recording.level = INFO
>
> >
>
> >               taskmanager_3  |
>
> >              metrics.sample.window.ms = 30000
>
> >
>
> >               taskmanager_3  |     partition.assignment.strategy =
> [class org.apache.kafka.clients.consumer.RangeAssignor]
>
> >
>
> >               taskmanager_3  |     receive.buffer.bytes = 65536
>
> >
>
> >               taskmanager_3  |
>
> >              reconnect.backoff.ms = 50
>
> >
>
> >               taskmanager_3  |
>
> >              request.timeout.ms = 305000
>
> >
>
> >               taskmanager_3  |
>
> >              retry.backoff.ms = 100
>
> >
>
> >               taskmanager_3  |     sasl.jaas.config = null
>
> >
>
> >               taskmanager_3  |     sasl.kerberos.kinit.cmd =
> /usr/bin/kinit
>
> >
>
> >               taskmanager_3  |     sasl.kerberos.min.time.before.relogin
> = 60000
>
> >
>
> >               taskmanager_3  |
>
> >              sasl.kerberos.service.name = null
>
> >
>
> >               taskmanager_3  |     sasl.kerberos.ticket.renew.jitter =
> 0.05
>
> >
>
> >               taskmanager_3  |     sasl.kerberos.ticket.renew.window.factor
> = 0.8
>
> >
>
> >               taskmanager_3  |     sasl.mechanism = GSSAPI
>
> >
>
> >               taskmanager_3  |     security.protocol = PLAINTEXT
>
> >
>
> >               taskmanager_3  |     send.buffer.bytes = 131072
>
> >
>
> >               taskmanager_3  |
>
> >              session.timeout.ms = 10000
>
> >
>
> >               taskmanager_3  |     ssl.cipher.suites = null
>
> >
>
> >               taskmanager_3  |     ssl.enabled.protocols = [TLSv1.2,
> TLSv1.1, TLSv1]
>
> >
>
> >               taskmanager_3  |
>
> >              ssl.endpoint.identification.algorithm = null
>
> >
>
> >               taskmanager_3  |     ssl.key.password = null
>
> >
>
> >               taskmanager_3  |     ssl.keymanager.algorithm = SunX509
>
> >
>
> >               taskmanager_3  |     ssl.keystore.location = null
>
> >
>
> >               taskmanager_3  |     ssl.keystore.password = null
>
> >
>
> >               taskmanager_3  |     ssl.keystore.type = JKS
>
> >
>
> >               taskmanager_3  |     ssl.protocol = TLS
>
> >
>
> >               taskmanager_3  |     ssl.provider = null
>
> >
>
> >               taskmanager_3  |     ssl.secure.random.implementation =
> null
>
> >
>
> >               taskmanager_3  |     ssl.trustmanager.algorithm = PKIX
>
> >
>
> >               taskmanager_3  |     ssl.truststore.location = null
>
> >
>
> >               taskmanager_3  |     ssl.truststore.password = null
>
> >
>
> >               taskmanager_3  |     ssl.truststore.type = JKS
>
> >
>
> >               taskmanager_3  |     value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> >
>
> >               taskmanager_3  |
>
> >
>
> >               taskmanager_3  | 2017-11-17 18:29:00,765 WARN
>
> >               org.apache.kafka.clients.consumer.ConsumerConfig
> - The
>
> >               configuration 'topic' was supplied but isn't a known
> config.
>
> >
>
> >               taskmanager_3  | 2017-11-17 18:29:00,765 INFO
>
> >               org.apache.kafka.common.utils.
> AppInfoParser                   - Kafka
>
> >               version : 0.10.2.1
>
> >
>
> >               taskmanager_3  | 2017-11-17 18:29:00,770 INFO
>
> >               org.apache.kafka.common.utils.
> AppInfoParser                   - Kafka
>
> >               commitId : e89bffd6b2eff799
>
> >
>
> >               taskmanager_3  | 2017-11-17 18:29:00,791 INFO
>
> >               org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> -
>
> >               Discovered coordinator kafka:9092 (id:
>
> >              2147482646 rack: null) for group
>
> >               TopicConsumers.
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >               I'm running Kafka and Flink jobs in docker containers, the
> console-consumers from localhost
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >                >-------- Оригинално писмо --------
>
> >
>
> >                >От: Gary Yao
>
> >              gary@data-artisans.com
>
> >
>
> >                >Относно: Re: all task managers reading from all kafka
> partitions
>
> >
>
> >                >До: "r. r." <
>
> >              robert@abv.bg>
>
> >
>
> >                >Изпратено на: 17.11.2017 20:02
>
> >
>
> >
>
> >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >              >      Hi Robert,
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      Can you tell us which Flink version you are using?
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      Also, are you starting a single job with
> parallelism 4 or are you starting several jobs?
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      Thanks!
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      Gary
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >      On Fri, Nov 17, 2017 at 4:41 PM, r. r.
>
> >
>
> >               >      <
>
> >              robert@abv.bg> wrote:
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >       Hi
>
> >
>
> >               >
>
> >
>
> >               >        I have this strange problem: 4 task managers each
> with one task slot, attaching to the same Kafka topic which has 10
> partitions.
>
> >
>
> >               >
>
> >
>
> >               >        When I post a single message to the Kafka topic
> it seems that all 4 consumers fetch the message and start processing
> (confirmed by TM logs).
>
> >
>
> >               >
>
> >
>
> >               >        If I run kafka-consumer-groups.sh  --describe
> --group TopicConsumers it says that only one message was posted to a single
> partition. Next message would generally go to another partition.
>
> >
>
> >               >
>
> >
>
> >               >        In addition, while the Flink jobs are running on
> the message, I start two kafka-console-consumer.sh and each would get only
> one message, as expected.
>
> >
>
> >               >
>
> >
>
> >               >        On start each of the Flink TM would post
> something that to me reads as if it would read from all partitions:
>
> >
>
> >               >
>
> >
>
> >               >        2017-11-17 15:03:38,688 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got
> 10 partitions from these topics: [TopicToConsume]
>
> >
>
> >               >        2017-11-17 15:03:38,689 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  -
> Consumer is going to read the following topics (with number of partitions):
> TopicToConsume (10),
>
> >
>
> >               >        2017-11-17 15:03:38,689 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> Consumer subtask 0 will start reading the following 10 partitions from the
> committed group offsets in Kafka: [KafkaTopicPartition{topic='TopicToConsume',
> partition=8}, KafkaTopicPartition{topic='TopicToConsume', partition=9},
> KafkaTopicPartition{topic='TopicToConsume', partition=6},
> KafkaTopicPartition{topic='TopicToConsume', partition=7},
> KafkaTopicPartition{topic='TopicToConsume', partition=4},
> KafkaTopicPartition{topic='TopicToConsume', partition=5},
> KafkaTopicPartition{topic='TopicToConsume', partition=2},
> KafkaTopicPartition{topic='TopicToConsume', partition=3},
> KafkaTopicPartition{topic='TopicToConsume', partition=0},
> KafkaTopicPartition{topic='TopicToConsume', partition=1}]
>
> >
>
> >               >        2017-11-17 15:03:38,699 INFO
> org.apache.kafka.clients.consumer.ConsumerConfig              -
> ConsumerConfig values:
>
> >
>
> >               >
>
> >
>
> >               >
>
> >              auto.commit.interval.ms = 5000
>
> >
>
> >               >                auto.offset.reset = latest
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >        Any hints?
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >               >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>

Mime
View raw message