nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-2732) ConsumeKafka 0.9 and 0.10 not handling partition reassignment case sufficiently
Date Tue, 06 Sep 2016 14:45:21 GMT

    [ https://issues.apache.org/jira/browse/NIFI-2732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15467584#comment-15467584
] 

ASF GitHub Bot commented on NIFI-2732:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/987#discussion_r77649182
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
---
    @@ -47,22 +156,244 @@
          * kafka client to collect more data from Kafka before committing the
          * offsets.
          *
    -     * @param offsets offsets
    -     * @throws KafkaException if issue occurs talking to underlying resource.
    +     * if false then we didn't do anything and should probably yield if true
    +     * then we committed new data
    +     *
    +     */
    +    boolean commit() {
    +        if (uncommittedOffsetsMap.isEmpty()) {
    +            resetInternalState();
    +            return false;
    +        }
    +        try {
    +            /**
    +             * Committing the nifi session then the offsets means we have an at
    +             * least once guarantee here. If we reversed the order we'd have at
    +             * most once.
    +             */
    +            final Collection<FlowFile> bundledFlowFiles = getBundles();
    +            if (!bundledFlowFiles.isEmpty()) {
    +                getProcessSession().transfer(getBundles(), REL_SUCCESS);
    --- End diff --
    
    Is there a reason we're calling getBundles() again here, instead of just calling transfer
with the bundledFlowFiles variable? getBundles() is fairly expensive to be calling again needlessly.


> ConsumeKafka 0.9 and 0.10 not handling partition reassignment case sufficiently
> -------------------------------------------------------------------------------
>
>                 Key: NIFI-2732
>                 URL: https://issues.apache.org/jira/browse/NIFI-2732
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Joseph Witt
>            Assignee: Joseph Witt
>            Priority: Critical
>             Fix For: 1.1.0
>
>
> The new ConsumeKafka clients handle the threading model of the consumer api correctly.
 However, they are not yet honoring partition reassignment cases sufficiently which means
we could have avoidable cases of duplication.  By registering a partition reassignment listener
we can handle it correctly.
> Further, the processor is loading subsequent polls of messages into memory rather than
writing directly to the process session/disk.  We could write them to disk and achieve far
better performance and efficiency.  Early testing shows easily achieving 100MB/s sustained
per thread on a simple laptop setup with defaults which will scale very nicely on a legit
installed content repository.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message