apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Siyuan Hua (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
Date Mon, 08 Aug 2016 06:31:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411388#comment-15411388

Siyuan Hua commented on APEXMALHAR-2169:

Then I think the problem is in softConstraint and hardConstraint code, it should never return
true because default limit is Long.MAX_VALUE.  

There is something in backlog that I didn't track in Jira(my bad). But since you have issue
here, can you please do some refactor here. 
We want to actually simplify the operator code instead of making it more and more complicate.
And kafka input operator is there for awhile and I don't see any requirement/asking for dynamic
partition based on throughput.
Can we take away the hardConstraint and softConstraint condition check and make the 2 upperbound
property deprecated.  So dynamic partition by default should only happen when kafka partition
And for ONE_TO_MANY partition strategy, the number of operator partitions should stay unchanged
for the whole application with the specified initialPartitionCount. I think there is still
bug there that if new kafka partition is added, we always start a new partition. That is not

And you can create another ticket to move all repartition based on throughput to a separate
Partitioner so the operator code would be simple and easy to understand/debug

> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition
> ----------------------------------------------------------------------------------------------
>                 Key: APEXMALHAR-2169
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>            Reporter: Chaitanya
>            Assignee: Chaitanya
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>        strategy : one_to_many
>        initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: Repartition
the operator(s) under 9223372036854775807 msgs/s and 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list after repartition:
OperatorMeta{name=Input, operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,

This message was sent by Atlassian JIRA

View raw message