nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From olegz <...@git.apache.org>
Subject [GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new...
Date Sat, 23 Apr 2016 13:44:13 GMT
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60828729
  
    --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' validated against
'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one character that
is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception
{
    --- End diff --
    
    @pvillard31 Great catch! Need to change line:188 to look like this
    ```
    String[] events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah");
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message