nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mark Payne (JIRA)" <j...@apache.org>
Subject [jira] [Created] (NIFI-1665) GetKafka continually throws NullPointerException if it ever fails to write out message
Date Tue, 22 Mar 2016 14:58:25 GMT
Mark Payne created NIFI-1665:
--------------------------------

             Summary: GetKafka continually throws NullPointerException if it ever fails to
write out message
                 Key: NIFI-1665
                 URL: https://issues.apache.org/jira/browse/NIFI-1665
             Project: Apache NiFi
          Issue Type: Bug
            Reporter: Mark Payne


If an Exception is thrown in GetKafka's consumeFromKafka method, it enters the following block:

{code}
        catch (final Exception e) {
            this.shutdownConsumer();
            getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e});
            if (flowFile != null) {
                session.remove(flowFile);
            }
{code}

This call to shutdownConsumer performs the following:

{code}
if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) {
                    this.executor.shutdownNow();
                    getLogger().warn("Executor did not stop in 30 sec. Terminated.");
                }
                this.executor = null;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
{code}

Now that this.executor is set to null, the onTrigger method will continually throw NullPointerException
because it attempts to call executor.submit:

{code}
        synchronized (this.consumerStreamsReady) {
            if (!this.consumerStreamsReady.get()) {
                Future<Void> f = this.executor.submit(new Callable<Void>() {
...
{code}

and also

{code}
        if (this.consumerStreamsReady.get()) {
            Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>()
{
                @Override
                public Void call() throws Exception {
...
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message