samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debraj Manna <subharaj.ma...@gmail.com>
Subject Re: Reduce kafka partition for a topic samza is using
Date Tue, 07 Jan 2020 14:18:50 GMT
Thanks Bharath for replying.

Samza job is stateless and running in YARN cluster.

If I follow the below approach.

   1. Create a temp kafka topic
   2. Copy the messages from old topic to the new topic
   3. Delete old topic
   4. Create new topic with required partitions
   5. Delete old topic
   6. Copy messages from temp topic to new topic

What I have do with the __samza_checkpoint and __samza_coordinator_ topics?
Should I also delete them?

Can you explain what do you mean by reroute?

On Mon, Jan 6, 2020 at 7:40 PM Bharath Kumara Subramanian <
codin.martial@gmail.com> wrote:

> Hi Debraj,
>
> Kafka doesn't support reducing the partition size and only supports
> increasing the partition size of a topic.
> One way to accomplish it would be to create a new topic with the desired
> partition count and reroute data from the old topic. Although, it will be
> good to first understand the use case behind your request. Can you
> shed some light on this?
>
> In the event of change to input topic partition count, the implications to
> a Samza job are as follows
>
>    1. For stateless jobs, the job is shutdown and if you are running in a
>    cluster mode (YARN), typically containers get restarted and pick up the
>    change. In case of Standalone, a new rebalance is triggered.
>    2. For stateful jobs, the shutdown behavior is the same. However, based
>    on the choice of the grouper, it might result in additional tasks or
>    reduced number of tasks which would invalidate some of the state
> associated
>    with the tasks. If you have changelog enabled, you might need to
> recreate
>    the changelog topic otherwise, you might run into validation issues or
>    correctness issues with your application.
>
> As of how Samza detects partition count changes[1] and the actions it takes
> can be found here[2]
>
> Thanks,
> Bharath
>
>
> [1] -
>
> https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
> [2] -
>
> https://github.com/apache/samza/blob/beb5e1b40c07c092bc6e14aafc131d96eda5fcd4/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java#L370
>
>
>
> On Mon, Jan 6, 2020 at 4:31 AM Debraj Manna <subharaj.manna@gmail.com>
> wrote:
>
> > Anyone any thoughts on this?
> >
> > On Sat, Jan 4, 2020 at 5:16 PM Debraj Manna <subharaj.manna@gmail.com>
> > wrote:
> >
> > > I am using samza on yarn with Kafka. I need to reduce the number of
> > > partitions in kafka. I am ok with some data loss. Can someone suggest
> > what
> > > should be the recommended way of doing this?
> > >
> > > Samza Job Config looks like this -
> > >
> > > job.factory.class = org.apache.samza.job.yarn.YarnJobFactory
> > > task.class = com.vnera.grid.task.GenericStreamTask
> > > task.window.ms = 100
> > > systems.kafka.samza.factory =
> > > org.apache.samza.system.kafka.KafkaSystemFactory
> > > systems.kafka.consumer.zookeeper.connect = localhost:2181
> > > systems.kafka.consumer.auto.offset.reset = largest
> > > systems.kafka.producer.metadata.broker.list = localhost:9092
> > > systems.kafka.producer.producer.type = sync
> > > systems.kafka.producer.batch.num.messages = 1
> > > systems.kafka.samza.key.serde = string
> > > serializers.registry.string.class =
> > > org.apache.samza.serializers.StringSerdeFactory
> > > yarn.package.path =
> > >
> file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
> > > yarn.container.count = ${container.count}
> > > yarn.am.container.memory.mb =  ${samzajobs.memory.mb}
> > > job.name = job4
> > > task.inputs = kafka.Topic3
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message