Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 012A0200CC1 for ; Mon, 10 Jul 2017 20:38:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F3ABB163EE8; Mon, 10 Jul 2017 18:38:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 44F54163EE4 for ; Mon, 10 Jul 2017 20:38:45 +0200 (CEST) Received: (qmail 1435 invoked by uid 500); 10 Jul 2017 18:38:44 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 1424 invoked by uid 99); 10 Jul 2017 18:38:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Jul 2017 18:38:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E2B71193EC2 for ; Mon, 10 Jul 2017 18:38:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id zLasyAb3BB90 for ; Mon, 10 Jul 2017 18:38:43 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 1541C626FF for ; Mon, 10 Jul 2017 18:28:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A3AA0E08C1 for ; Mon, 10 Jul 2017 18:28:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 297E924695 for ; Mon, 10 Jul 2017 18:28:00 +0000 (UTC) Date: Mon, 10 Jul 2017 18:28:00 +0000 (UTC) From: "Guozhang Wang (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-5578) Streams Task Assignor should consider the staleness of state directories when allocating tasks MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 10 Jul 2017 18:38:46 -0000 [ https://issues.apache.org/jira/browse/KAFKA-5578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080805#comment-16080805 ] Guozhang Wang commented on KAFKA-5578: -------------------------------------- Hey [~damianguy] do you mean that there could be multiple threads that have the state dir for a given task and that task is not on any thread's {{previous active tasks}} list? In the StreamPartitionAssignor we chose priorities as: 1. Previous active tasks: this information is get from the {{removeActiveTask}} so at most one thread could ever has this task in its list. 2. Previous standby tasks: this information is collected from stat dir, so that multiple threads could have this task in its list. 3. Pick random one with load balance in mind. Your concern is that if there is none for priority 1), then we may have multiple candidates for priority 2) but actually have different restoration cost, is that correct? I wonder in practice, with state dir cleanup process, how often could we ever hit that issue? > Streams Task Assignor should consider the staleness of state directories when allocating tasks > ---------------------------------------------------------------------------------------------- > > Key: KAFKA-5578 > URL: https://issues.apache.org/jira/browse/KAFKA-5578 > Project: Kafka > Issue Type: Bug > Reporter: Damian Guy > > During task assignment we use the presence of a state directory to assign precedence to which instances should be assigned the task. We first chose previous active tasks, but then fall back to the existence of a state dir. Unfortunately we don't take into account the recency of the data from the available state dirs. So in the case where a task has run on many instances, it may be that we chose an instance that has relatively old data. > When doing task assignment we should take into consideration the age of the data in the state dirs. We could use the data from the checkpoint files to determine which instance is most up-to-date and attempt to assign accordingly (obviously making sure that tasks are still balanced across available instances) -- This message was sent by Atlassian JIRA (v6.4.14#64029)