nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-1296) Add capabilities to Kafka NAR to use new Kafka API (0.9)
Date Fri, 22 Apr 2016 20:25:13 GMT

    [ https://issues.apache.org/jira/browse/NIFI-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254585#comment-15254585
] 

ASF GitHub Bot commented on NIFI-1296:
--------------------------------------

Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60797930
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against
'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception
{
    --- End diff --
    
    I see this fairly often.
    
    
    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest
    Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.811 sec <<<
FAILURE! - in org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest
    validateLifecycleCorrectnessWithProcessingFailures(org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest)
 Time elapsed: 0.694 sec  <<< FAILURE!
    java.lang.AssertionError: expected:<10000> but was:<9999>
    	at org.junit.Assert.fail(Assert.java:88)
    	at org.junit.Assert.failNotEquals(Assert.java:834)
    	at org.junit.Assert.assertEquals(Assert.java:645)
    	at org.junit.Assert.assertEquals(Assert.java:631)
    	at org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest.validateLifecycleCorrectnessWithProcessingFailures(AbstractKafkaProcessorLifecycelTest.java:221)



> Add capabilities to Kafka NAR to use new Kafka API (0.9)
> --------------------------------------------------------
>
>                 Key: NIFI-1296
>                 URL: https://issues.apache.org/jira/browse/NIFI-1296
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 0.4.0
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> Not sure when can we address this, but the interesting comment in https://github.com/apache/nifi/pull/143.
The usage of new API may introduce issues with running against older Kafka brokers (e.g.,
0.8). Need to investigate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message