Return-Path: X-Original-To: apmail-nifi-dev-archive@minotaur.apache.org Delivered-To: apmail-nifi-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4AC101946D for ; Sat, 23 Apr 2016 12:48:24 +0000 (UTC) Received: (qmail 47420 invoked by uid 500); 23 Apr 2016 12:48:24 -0000 Delivered-To: apmail-nifi-dev-archive@nifi.apache.org Received: (qmail 47368 invoked by uid 500); 23 Apr 2016 12:48:24 -0000 Mailing-List: contact dev-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list dev@nifi.apache.org Received: (qmail 47357 invoked by uid 99); 23 Apr 2016 12:48:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Apr 2016 12:48:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B4C92DFF46; Sat, 23 Apr 2016 12:48:23 +0000 (UTC) From: pvillard31 To: dev@nifi.apache.org Reply-To: dev@nifi.apache.org References: In-Reply-To: Subject: [GitHub] nifi pull request: NIFI-1296, NIFI-1680, NIFI-1764 implemented new... Content-Type: text/plain Message-Id: <20160423124823.B4C92DFF46@git1-us-west.apache.org> Date: Sat, 23 Apr 2016 12:48:23 +0000 (UTC) Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/366#discussion_r60828026 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.lang.reflect.Field; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.util.MockSessionFactory; +import org.apache.nifi.util.SharedSessionState; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class AbstractKafkaProcessorLifecycelTest { + + @Test + public void validateBaseProperties() throws Exception { + TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class); + runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, ""); + runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo"); + runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); + + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo"); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid")); + } + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234"); + + runner.removeProperty(ConsumeKafka.TOPIC); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("'topic' is invalid because topic is required")); + } + + runner.setProperty(ConsumeKafka.TOPIC, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.TOPIC, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + runner.setProperty(ConsumeKafka.TOPIC, "blah"); + + runner.removeProperty(ConsumeKafka.CLIENT_ID); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("invalid because client.id is required")); + } + + runner.setProperty(ConsumeKafka.CLIENT_ID, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.CLIENT_ID, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj"); + + runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + } + + /* + * The goal of this test is to validate the correctness of + * AbstractKafkaProcessor's implementation of onTrigger() in a highly + * concurrent environment. That is: + * - upon processing failures (e.g., unhandled exceptions), the target Kafka + * resource is reset (closed and re-created) + * - no FlowFile is unaccounted for. FlowFiles left in the queue and FlowFiles + * in Success relationship = testCount + * - failed executions that did not result in the call to close/reset summed with + * verified calls to close should equal total request failed + */ + @Test + public void validateLifecycleCorrectnessWithProcessingFailures() throws Exception { --- End diff -- @olegz @joewitt I do see the same test failure on my side and also the one below: ```` Running org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaTest Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 52.39 sec <<< FAILURE! - in org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaTest validateGetAllMessagesWithProvidedDemarcator(org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaTest) Time elapsed: 1.112 sec <<< FAILURE! org.junit.ComparisonFailure: expected:<[????????????]-3> but was:<[????????????]-3> at org.junit.Assert.assertEquals(Assert.java:115) at org.junit.Assert.assertEquals(Assert.java:144) at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaTest.validateGetAllMessagesWithProvidedDemarcator(ConsumeKafkaTest.java:191) ```` But maybe something is needed on my side to support Russian and Japanese characters? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---