From dev-return-11271-archive-asf-public=cust-asf.ponee.io@samza.apache.org Wed Jan 8 17:18:26 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3636D180607 for ; Wed, 8 Jan 2020 18:18:26 +0100 (CET) Received: (qmail 37137 invoked by uid 500); 8 Jan 2020 17:18:25 -0000 Mailing-List: contact dev-help@samza.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@samza.apache.org Delivered-To: mailing list dev@samza.apache.org Received: (qmail 37125 invoked by uid 99); 8 Jan 2020 17:18:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jan 2020 17:18:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id EA2F31A42C8 for ; Wed, 8 Jan 2020 17:18:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.001 X-Spam-Level: X-Spam-Status: No, score=0.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id mMrI8ZJbEugJ for ; Wed, 8 Jan 2020 17:18:21 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::234; helo=mail-oi1-x234.google.com; envelope-from=codin.martial@gmail.com; receiver= Received: from mail-oi1-x234.google.com (mail-oi1-x234.google.com [IPv6:2607:f8b0:4864:20::234]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 263BD7DE06 for ; Wed, 8 Jan 2020 17:18:21 +0000 (UTC) Received: by mail-oi1-x234.google.com with SMTP id d62so3273220oia.11 for ; Wed, 08 Jan 2020 09:18:21 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=HDezSOUe5Z+u+RqWVH587px00s4cdOp9XgCSfHaGdZQ=; b=B9K3/UhD2v4wmTLmwAnaqGWV+afoGE5vgzHr1kvKG2TrycFSL3pzAKp86zWAYv53kA 6KP61DSuIGAgH2/WxigRLUuY3S5LaRreg8td0EWJhjY/aR1CMBqn6rNqh0LrJixn0m0V wHaNTbSAHJHg5+dhoSDoo823Dej/fES1eA2/UuWWimDDoPOW+53RMPFU4O8lj6D6FojK Vhx2KeqvZPLn549Y877A0nCWB9QFDTebZxMZsn3DsKk+RsfvjcAcOvOKtfbhzk/WPZHW wilArNH3GHUWHvDvfmfVf9EJ4zuhmdTpBeQm3PR7YlWexFPaygareQD5Q47hz3hSOuar T8/A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=HDezSOUe5Z+u+RqWVH587px00s4cdOp9XgCSfHaGdZQ=; b=i2IUlb7quDHgbaODgnWiaoyPAu+VyDo8Vry0ZovXe6dqHofbIW4qEpxwtMhujPgyT2 xbX9UOcc9X4YO6qefyCuuJRoLj2DibOk+ztzMxdPNxGWmrRPtPzfb2ovz9SD3qorDOfQ mlBC2poXm0lLGprY5rwQ0eTrZQaG2greuU+duTTys2tDSulbLRrYz6drpEbi4lY/DD9V ATP5ZunVg0sb2tmMMgZBpi6UHh5HtC2VpxkGzu/tuyvsMuKHWXs0BvjSFiFc3tCbcjFf Xau1H5zHbwy9UoNjRjQj46vNXSQ+l94dWa+QQlAb0ZO0NYVigRU84/IBTRd7SP4NcWHd HZaQ== X-Gm-Message-State: APjAAAV+lHUXM49+luydQ80YsQ4KWM0KrRuPWZmkyA++cvZI5CC+/At0 fK016ONkUXYtmVe7CC66SFMw5ZpSXi9RLGt+BlwQcg== X-Google-Smtp-Source: APXvYqw8wAuTM7XYw3O08hu5nZHUr9iSqkDz9g2Otl+vBkPZ4UEatR7yjL8PmFkfQXBmXYac0JqgpP+Si69KJyGKcxw= X-Received: by 2002:aca:2114:: with SMTP id 20mr4054734oiz.9.1578503899613; Wed, 08 Jan 2020 09:18:19 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Bharath Kumara Subramanian Date: Wed, 8 Jan 2020 09:18:08 -0800 Message-ID: Subject: Re: Reduce kafka partition for a topic samza is using To: dev@samza.apache.org Content-Type: multipart/alternative; boundary="000000000000368e20059ba414ec" --000000000000368e20059ba414ec Content-Type: text/plain; charset="UTF-8" There are few ways to achieve data copy. 1. Use the vanilla Kafka consumer that consumes data from the old topic and produce to the new topic with fewer partitions. 2. Write a Samza job that reads from your old topic and funnels the data to the new topic. I'd recommend you to follow up with Kafka community too if you are looking for more options. You typically don't have to delete the checkpoint and coordinator topic. The checkpoints of the new tasks should overwrite the old tasks. However, you might be left with some stale data since the new topic has fewer partitions and hence fewer tasks. Coordinator topic stores config for the most part and it is possible the topics have some stale topic configurations (if any). Hope that helps. Thanks, Bharath On Tue, Jan 7, 2020 at 6:19 AM Debraj Manna wrote: > Thanks Bharath for replying. > > Samza job is stateless and running in YARN cluster. > > If I follow the below approach. > > 1. Create a temp kafka topic > 2. Copy the messages from old topic to the new topic > 3. Delete old topic > 4. Create new topic with required partitions > 5. Delete old topic > 6. Copy messages from temp topic to new topic > > What I have do with the __samza_checkpoint and __samza_coordinator_ topics? > Should I also delete them? > > Can you explain what do you mean by reroute? > > On Mon, Jan 6, 2020 at 7:40 PM Bharath Kumara Subramanian < > codin.martial@gmail.com> wrote: > > > Hi Debraj, > > > > Kafka doesn't support reducing the partition size and only supports > > increasing the partition size of a topic. > > One way to accomplish it would be to create a new topic with the desired > > partition count and reroute data from the old topic. Although, it will be > > good to first understand the use case behind your request. Can you > > shed some light on this? > > > > In the event of change to input topic partition count, the implications > to > > a Samza job are as follows > > > > 1. For stateless jobs, the job is shutdown and if you are running in a > > cluster mode (YARN), typically containers get restarted and pick up > the > > change. In case of Standalone, a new rebalance is triggered. > > 2. For stateful jobs, the shutdown behavior is the same. However, > based > > on the choice of the grouper, it might result in additional tasks or > > reduced number of tasks which would invalidate some of the state > > associated > > with the tasks. If you have changelog enabled, you might need to > > recreate > > the changelog topic otherwise, you might run into validation issues or > > correctness issues with your application. > > > > As of how Samza detects partition count changes[1] and the actions it > takes > > can be found here[2] > > > > Thanks, > > Bharath > > > > > > [1] - > > > > > https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java > > [2] - > > > > > https://github.com/apache/samza/blob/beb5e1b40c07c092bc6e14aafc131d96eda5fcd4/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java#L370 > > > > > > > > On Mon, Jan 6, 2020 at 4:31 AM Debraj Manna > > wrote: > > > > > Anyone any thoughts on this? > > > > > > On Sat, Jan 4, 2020 at 5:16 PM Debraj Manna > > > wrote: > > > > > > > I am using samza on yarn with Kafka. I need to reduce the number of > > > > partitions in kafka. I am ok with some data loss. Can someone suggest > > > what > > > > should be the recommended way of doing this? > > > > > > > > Samza Job Config looks like this - > > > > > > > > job.factory.class = org.apache.samza.job.yarn.YarnJobFactory > > > > task.class = com.vnera.grid.task.GenericStreamTask > > > > task.window.ms = 100 > > > > systems.kafka.samza.factory = > > > > org.apache.samza.system.kafka.KafkaSystemFactory > > > > systems.kafka.consumer.zookeeper.connect = localhost:2181 > > > > systems.kafka.consumer.auto.offset.reset = largest > > > > systems.kafka.producer.metadata.broker.list = localhost:9092 > > > > systems.kafka.producer.producer.type = sync > > > > systems.kafka.producer.batch.num.messages = 1 > > > > systems.kafka.samza.key.serde = string > > > > serializers.registry.string.class = > > > > org.apache.samza.serializers.StringSerdeFactory > > > > yarn.package.path = > > > > > > file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz > > > > yarn.container.count = ${container.count} > > > > yarn.am.container.memory.mb = ${samzajobs.memory.mb} > > > > job.name = job4 > > > > task.inputs = kafka.Topic3 > > > > > > > > > > --000000000000368e20059ba414ec--