Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9ED2A200CA8 for ; Thu, 1 Jun 2017 05:09:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9D53C160BDE; Thu, 1 Jun 2017 03:09:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E3B74160BCB for ; Thu, 1 Jun 2017 05:09:07 +0200 (CEST) Received: (qmail 9453 invoked by uid 500); 1 Jun 2017 03:09:06 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 9440 invoked by uid 99); 1 Jun 2017 03:09:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Jun 2017 03:09:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D2B841A07A0 for ; Thu, 1 Jun 2017 03:09:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id tD5aLrGISPFP for ; Thu, 1 Jun 2017 03:09:05 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id C1D915F2A8 for ; Thu, 1 Jun 2017 03:09:04 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 496BEE0373 for ; Thu, 1 Jun 2017 03:09:04 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 093FD2193C for ; Thu, 1 Jun 2017 03:09:04 +0000 (UTC) Date: Thu, 1 Jun 2017 03:09:04 +0000 (UTC) From: "xingHu (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (FLINK-6790) Flink Kafka Consumer Cannot Round Robin Fetch Records MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 01 Jun 2017 03:09:08 -0000 xingHu created FLINK-6790: ----------------------------- Summary: Flink Kafka Consumer Cannot Round Robin Fetch Records Key: FLINK-6790 URL: https://issues.apache.org/jira/browse/FLINK-6790 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.2.1 Reporter: xingHu The Java consumer fails consume messages in a round robin fashion. This can lead to an unbalance consumption. In our use case we have a set of consumer that can take a significant amount of time consuming messages off a topic. For this reason, we are using the pause/poll/resume pattern to ensure the consumer session is not timeout. The topic that is being consumed has been preloaded with message. That means there is a significant message lag when the consumer is first started. To limit how many messages are consumed at a time, the consumer has been configured with max.poll.records=1. The first initial observation is that the client receive a large batch of messages for the first partition it decides to consume from and will consume all those messages before moving on, rather than returning a message from a different partition for each call to poll. We solved this issue by configuring max.partition.fetch.bytes to be small enough that only a single message will be returned by the broker on each fetch, although this would not be feasible if message size were highly variable. The behavior of the consumer after this change is to largely consume from a small number of partitions, usually just two, iterating between them, until it exhausts them, before moving to another partition. This behavior is problematic if the messages have some rough time semantics and need to be process roughly time ordered across all partitions. It would be useful if source has a pluggable API that allowed custom logic to select which partition to consume from next, thus enabling the creation of a round robin partition consumer. -- This message was sent by Atlassian JIRA (v6.3.15#6346)