nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject nifi git commit: NIFI-4756: Updated PublishKafkaRecord processors to include attributes generated from schema write strategy into the message headers when appropriate
Date Tue, 16 Jan 2018 21:33:36 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 28e1bcc9d -> 7c1ce1722


NIFI-4756: Updated PublishKafkaRecord processors to include attributes generated from schema write strategy into the message headers when appropriate

This closes #2396.

Signed-off-by: Bryan Bende <bbende@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7c1ce172
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7c1ce172
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7c1ce172

Branch: refs/heads/master
Commit: 7c1ce172232d5fd8ab2a1c1649a9dcbf1a9d08d7
Parents: 28e1bcc
Author: Mark Payne <markap14@hotmail.com>
Authored: Wed Jan 10 09:04:52 2018 -0500
Committer: Bryan Bende <bbende@apache.org>
Committed: Tue Jan 16 16:31:34 2018 -0500

----------------------------------------------------------------------
 .../WriteAvroSchemaAttributeStrategy.java       |  26 ++-
 .../processors/kafka/pubsub/PublisherLease.java |  24 +-
 .../pubsub/TestPublishKafkaRecord_0_11.java     |   2 +-
 .../kafka/pubsub/TestPublisherLease.java        |   5 +-
 .../nifi/processors/kafka/pubsub/TestUtils.java |  45 ----
 .../kafka/pubsub/util/MockRecordWriter.java     |   2 +-
 .../processors/kafka/test/EmbeddedKafka.java    | 226 -------------------
 .../nifi/processors/kafka/pubsub/TestUtils.java |  45 ----
 .../processors/kafka/pubsub/PublisherLease.java |  24 +-
 .../pubsub/TestPublishKafkaRecord_1_0.java      |   2 +-
 .../kafka/pubsub/TestPublisherLease.java        |   2 +
 .../nifi/processors/kafka/pubsub/TestUtils.java |  45 ----
 .../kafka/pubsub/util/MockRecordWriter.java     |   2 +-
 .../processors/kafka/test/EmbeddedKafka.java    | 226 -------------------
 .../org/apache/nifi/json/WriteJsonResult.java   |   9 +-
 15 files changed, 74 insertions(+), 611 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
index d9be673..5f94679 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.avro.Schema;
@@ -29,6 +31,12 @@ import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.serialization.record.RecordSchema;
 
 public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
+    private final Map<RecordSchema, String> avroSchemaTextCache = new LinkedHashMap<RecordSchema, String>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<RecordSchema, String> eldest) {
+            return size() > 10;
+        }
+    };
 
     @Override
     public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
@@ -36,8 +44,22 @@ public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
 
     @Override
     public Map<String, String> getAttributes(final RecordSchema schema) {
-        final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
-        final String schemaText = avroSchema.toString();
+        // First, check if schema has the Avro Text available already.
+        final Optional<String> schemaFormat = schema.getSchemaFormat();
+        if (schemaFormat.isPresent() && AvroTypeUtil.AVRO_SCHEMA_FORMAT.equals(schemaFormat.get())) {
+            final Optional<String> schemaText = schema.getSchemaText();
+            if (schemaText.isPresent()) {
+                return Collections.singletonMap("avro.schema", schemaText.get());
+            }
+        }
+
+        String schemaText = avroSchemaTextCache.get(schema);
+        if (schemaText == null) {
+            final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
+            schemaText = avroSchema.toString();
+            avroSchemaTextCache.put(schema, schemaText);
+        }
+
         return Collections.singletonMap("avro.schema", schemaText);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 72c90d2..2e25129 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -39,6 +40,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
@@ -164,8 +166,10 @@ public class PublisherLease implements Closeable {
                 recordCount++;
                 baos.reset();
 
+                Map<String, String> additionalAttributes = Collections.emptyMap();
                 try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
-                    writer.write(record);
+                    final WriteResult writeResult = writer.write(record);
+                    additionalAttributes = writeResult.getAttributes();
                     writer.flush();
                 }
 
@@ -173,7 +177,7 @@ public class PublisherLease implements Closeable {
                 final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
                 final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
 
-                publish(flowFile, messageKey, messageContent, topic, tracker);
+                publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker);
 
                 if (tracker.isFailed(flowFile)) {
                     // If we have a failure, don't try to send anything else.
@@ -195,7 +199,7 @@ public class PublisherLease implements Closeable {
         }
     }
 
-    private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
+    private void addHeaders(final FlowFile flowFile, final Map<String, String> additionalAttributes, final ProducerRecord<?, ?> record) {
         if (attributeNameRegex == null) {
             return;
         }
@@ -206,11 +210,23 @@ public class PublisherLease implements Closeable {
                 headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
             }
         }
+
+        for (final Map.Entry<String, String> entry : additionalAttributes.entrySet()) {
+            if (attributeNameRegex.matcher(entry.getKey()).matches()) {
+                headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
+            }
+        }
     }
 
     protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+        publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker);
+    }
+
+    protected void publish(final FlowFile flowFile, final Map<String, String> additionalAttributes,
+        final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+
         final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
-        addHeaders(flowFile, record);
+        addHeaders(flowFile, additionalAttributes, record);
 
         producer.send(record, new Callback() {
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java
index b7d4abd..9a209d5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java
@@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_0_11 {
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_11.REL_SUCCESS, 2);
 
         verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index d2b52dd..3ab7abb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -38,12 +38,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
-import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
@@ -270,13 +270,12 @@ public class TestPublisherLease {
         final RecordSet recordSet = reader.createRecordSet();
         final RecordSchema schema = reader.getSchema();
 
-        final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age");
-
         final String topic = "unit-test";
         final String keyField = "person_id";
 
         final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
         final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
+        Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));
 
         Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
deleted file mode 100644
index 819e3b7..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import sun.misc.Unsafe;
-
-class TestUtils {
-
-    public static void setFinalField(Field field, Object instance, Object newValue) throws Exception {
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
-        field.set(instance, newValue);
-    }
-
-    static Unsafe getUnsafe() {
-        try {
-            Field f = Unsafe.class.getDeclaredField("theUnsafe");
-            f.setAccessible(true);
-            return (Unsafe) f.get(null);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index 90a909d..0eb8606 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -108,7 +108,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
 
             @Override
             public WriteResult write(Record record) throws IOException {
-                return null;
+                return WriteResult.of(1, Collections.emptyMap());
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
deleted file mode 100644
index a720b11..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.Properties;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-
-/**
- * Embedded Kafka server, primarily to be used for testing.
- */
-public class EmbeddedKafka {
-
-    private final KafkaServerStartable kafkaServer;
-
-    private final Properties zookeeperConfig;
-
-    private final Properties kafkaConfig;
-
-    private final ZooKeeperServer zkServer;
-
-    private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
-
-    private final int kafkaPort;
-
-    private final int zookeeperPort;
-
-    private boolean started;
-
-    /**
-     * Will create instance of the embedded Kafka server. Kafka and Zookeeper
-     * configuration properties will be loaded from 'server.properties' and
-     * 'zookeeper.properties' located at the root of the classpath.
-     */
-    public EmbeddedKafka() {
-        this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
-    }
-
-    /**
-     * Will create instance of the embedded Kafka server.
-     *
-     * @param kafkaConfig
-     *            Kafka configuration properties
-     * @param zookeeperConfig
-     *            Zookeeper configuration properties
-     */
-    public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
-        this.cleanupKafkaWorkDir();
-        this.zookeeperConfig = zookeeperConfig;
-        this.kafkaConfig = kafkaConfig;
-        this.kafkaPort = this.availablePort();
-        this.zookeeperPort = this.availablePort();
-
-        this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
-        this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
-        this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
-        this.zkServer = new ZooKeeperServer();
-        this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
-    }
-
-    /**
-     *
-     * @return port for Kafka server
-     */
-    public int getKafkaPort() {
-        if (!this.started) {
-            throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
-        }
-        return this.kafkaPort;
-    }
-
-    /**
-     *
-     * @return port for Zookeeper server
-     */
-    public int getZookeeperPort() {
-        if (!this.started) {
-            throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
-        }
-        return this.zookeeperPort;
-    }
-
-    /**
-     * Will start embedded Kafka server. Its data directories will be created
-     * at 'kafka-tmp' directory relative to the working directory of the current
-     * runtime. The data directories will be deleted upon JVM exit.
-     *
-     */
-    public void start() {
-        if (!this.started) {
-            logger.info("Starting Zookeeper server");
-            this.startZookeeper();
-
-            logger.info("Starting Kafka server");
-            this.kafkaServer.startup();
-
-            logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
-                    + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
-            this.started = true;
-        }
-    }
-
-    /**
-     * Will stop embedded Kafka server, cleaning up all working directories.
-     */
-    public void stop() {
-        if (this.started) {
-            logger.info("Shutting down Kafka server");
-            this.kafkaServer.shutdown();
-            this.kafkaServer.awaitShutdown();
-            logger.info("Shutting down Zookeeper server");
-            this.shutdownZookeeper();
-            logger.info("Embedded Kafka is shut down.");
-            this.cleanupKafkaWorkDir();
-            this.started = false;
-        }
-    }
-
-    /**
-     *
-     */
-    private void cleanupKafkaWorkDir() {
-        File kafkaTmp = new File("target/kafka-tmp");
-        try {
-            FileUtils.deleteDirectory(kafkaTmp);
-        } catch (Exception e) {
-            logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
-        }
-    }
-
-    /**
-     * Will start Zookeeper server via {@link ServerCnxnFactory}
-     */
-    private void startZookeeper() {
-        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
-        try {
-            quorumConfiguration.parseProperties(this.zookeeperConfig);
-
-            ServerConfig configuration = new ServerConfig();
-            configuration.readFrom(quorumConfiguration);
-
-            FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir()));
-
-            zkServer.setTxnLogFactory(txnLog);
-            zkServer.setTickTime(configuration.getTickTime());
-            zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
-            zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
-            ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory();
-            zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
-                    configuration.getMaxClientCnxns());
-            zookeeperConnectionFactory.startup(zkServer);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to start Zookeeper server", e);
-        }
-    }
-
-    /**
-     * Will shut down Zookeeper server.
-     */
-    private void shutdownZookeeper() {
-        zkServer.shutdown();
-    }
-
-    /**
-     * Will load {@link Properties} from properties file discovered at the
-     * provided path relative to the root of the classpath.
-     */
-    private static Properties loadPropertiesFromClasspath(String path) {
-        try {
-            Properties kafkaProperties = new Properties();
-            kafkaProperties.load(Class.class.getResourceAsStream(path));
-            return kafkaProperties;
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    /**
-     * Will determine the available port used by Kafka/Zookeeper servers.
-     */
-    private int availablePort() {
-        ServerSocket s = null;
-        try {
-            s = new ServerSocket(0);
-            s.setReuseAddress(true);
-            return s.getLocalPort();
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to discover available port.", e);
-        } finally {
-            try {
-                s.close();
-            } catch (IOException e) {
-                // ignore
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
deleted file mode 100644
index 819e3b7..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import sun.misc.Unsafe;
-
-class TestUtils {
-
-    public static void setFinalField(Field field, Object instance, Object newValue) throws Exception {
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
-        field.set(instance, newValue);
-    }
-
-    static Unsafe getUnsafe() {
-        try {
-            Field f = Unsafe.class.getDeclaredField("theUnsafe");
-            f.setAccessible(true);
-            return (Unsafe) f.get(null);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 2b1cfe2..1c241a4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -39,6 +40,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
@@ -163,8 +165,10 @@ public class PublisherLease implements Closeable {
                 recordCount++;
                 baos.reset();
 
+                Map<String, String> additionalAttributes = Collections.emptyMap();
                 try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
-                    writer.write(record);
+                    final WriteResult writeResult = writer.write(record);
+                    additionalAttributes = writeResult.getAttributes();
                     writer.flush();
                 }
 
@@ -172,7 +176,7 @@ public class PublisherLease implements Closeable {
                 final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
                 final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
 
-                publish(flowFile, messageKey, messageContent, topic, tracker);
+                publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker);
 
                 if (tracker.isFailed(flowFile)) {
                     // If we have a failure, don't try to send anything else.
@@ -194,7 +198,7 @@ public class PublisherLease implements Closeable {
         }
     }
 
-    private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
+    private void addHeaders(final FlowFile flowFile, final Map<String, String> additionalAttributes, final ProducerRecord<?, ?> record) {
         if (attributeNameRegex == null) {
             return;
         }
@@ -205,11 +209,23 @@ public class PublisherLease implements Closeable {
                 headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
             }
         }
+
+        for (final Map.Entry<String, String> entry : additionalAttributes.entrySet()) {
+            if (attributeNameRegex.matcher(entry.getKey()).matches()) {
+                headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
+            }
+        }
     }
 
     protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+        publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker);
+    }
+
+    protected void publish(final FlowFile flowFile, final Map<String, String> additionalAttributes,
+        final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+
         final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
-        addHeaders(flowFile, record);
+        addHeaders(flowFile, additionalAttributes, record);
 
         producer.send(record, new Callback() {
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
index 45439cc..abadc89 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
@@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_1_0 {
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 2);
 
         verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index b2e1b0e..2fbf539 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -43,6 +43,7 @@ import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
@@ -274,6 +275,7 @@ public class TestPublisherLease {
 
         final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
         final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
+        Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));
 
         Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
deleted file mode 100644
index 819e3b7..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import sun.misc.Unsafe;
-
-class TestUtils {
-
-    public static void setFinalField(Field field, Object instance, Object newValue) throws Exception {
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
-        field.set(instance, newValue);
-    }
-
-    static Unsafe getUnsafe() {
-        try {
-            Field f = Unsafe.class.getDeclaredField("theUnsafe");
-            f.setAccessible(true);
-            return (Unsafe) f.get(null);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index 90a909d..0eb8606 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -108,7 +108,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
 
             @Override
             public WriteResult write(Record record) throws IOException {
-                return null;
+                return WriteResult.of(1, Collections.emptyMap());
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
deleted file mode 100644
index a720b11..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.Properties;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-
-/**
- * Embedded Kafka server, primarily to be used for testing.
- */
-public class EmbeddedKafka {
-
-    private final KafkaServerStartable kafkaServer;
-
-    private final Properties zookeeperConfig;
-
-    private final Properties kafkaConfig;
-
-    private final ZooKeeperServer zkServer;
-
-    private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
-
-    private final int kafkaPort;
-
-    private final int zookeeperPort;
-
-    private boolean started;
-
-    /**
-     * Will create instance of the embedded Kafka server. Kafka and Zookeeper
-     * configuration properties will be loaded from 'server.properties' and
-     * 'zookeeper.properties' located at the root of the classpath.
-     */
-    public EmbeddedKafka() {
-        this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
-    }
-
-    /**
-     * Will create instance of the embedded Kafka server.
-     *
-     * @param kafkaConfig
-     *            Kafka configuration properties
-     * @param zookeeperConfig
-     *            Zookeeper configuration properties
-     */
-    public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
-        this.cleanupKafkaWorkDir();
-        this.zookeeperConfig = zookeeperConfig;
-        this.kafkaConfig = kafkaConfig;
-        this.kafkaPort = this.availablePort();
-        this.zookeeperPort = this.availablePort();
-
-        this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
-        this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
-        this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
-        this.zkServer = new ZooKeeperServer();
-        this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
-    }
-
-    /**
-     *
-     * @return port for Kafka server
-     */
-    public int getKafkaPort() {
-        if (!this.started) {
-            throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
-        }
-        return this.kafkaPort;
-    }
-
-    /**
-     *
-     * @return port for Zookeeper server
-     */
-    public int getZookeeperPort() {
-        if (!this.started) {
-            throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
-        }
-        return this.zookeeperPort;
-    }
-
-    /**
-     * Will start embedded Kafka server. Its data directories will be created
-     * at 'kafka-tmp' directory relative to the working directory of the current
-     * runtime. The data directories will be deleted upon JVM exit.
-     *
-     */
-    public void start() {
-        if (!this.started) {
-            logger.info("Starting Zookeeper server");
-            this.startZookeeper();
-
-            logger.info("Starting Kafka server");
-            this.kafkaServer.startup();
-
-            logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
-                    + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
-            this.started = true;
-        }
-    }
-
-    /**
-     * Will stop embedded Kafka server, cleaning up all working directories.
-     */
-    public void stop() {
-        if (this.started) {
-            logger.info("Shutting down Kafka server");
-            this.kafkaServer.shutdown();
-            this.kafkaServer.awaitShutdown();
-            logger.info("Shutting down Zookeeper server");
-            this.shutdownZookeeper();
-            logger.info("Embedded Kafka is shut down.");
-            this.cleanupKafkaWorkDir();
-            this.started = false;
-        }
-    }
-
-    /**
-     *
-     */
-    private void cleanupKafkaWorkDir() {
-        File kafkaTmp = new File("target/kafka-tmp");
-        try {
-            FileUtils.deleteDirectory(kafkaTmp);
-        } catch (Exception e) {
-            logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
-        }
-    }
-
-    /**
-     * Will start Zookeeper server via {@link ServerCnxnFactory}
-     */
-    private void startZookeeper() {
-        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
-        try {
-            quorumConfiguration.parseProperties(this.zookeeperConfig);
-
-            ServerConfig configuration = new ServerConfig();
-            configuration.readFrom(quorumConfiguration);
-
-            FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir()));
-
-            zkServer.setTxnLogFactory(txnLog);
-            zkServer.setTickTime(configuration.getTickTime());
-            zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
-            zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
-            ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory();
-            zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
-                    configuration.getMaxClientCnxns());
-            zookeeperConnectionFactory.startup(zkServer);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to start Zookeeper server", e);
-        }
-    }
-
-    /**
-     * Will shut down Zookeeper server.
-     */
-    private void shutdownZookeeper() {
-        zkServer.shutdown();
-    }
-
-    /**
-     * Will load {@link Properties} from properties file discovered at the
-     * provided path relative to the root of the classpath.
-     */
-    private static Properties loadPropertiesFromClasspath(String path) {
-        try {
-            Properties kafkaProperties = new Properties();
-            kafkaProperties.load(Class.class.getResourceAsStream(path));
-            return kafkaProperties;
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    /**
-     * Will determine the available port used by Kafka/Zookeeper servers.
-     */
-    private int availablePort() {
-        ServerSocket s = null;
-        try {
-            s = new ServerSocket(0);
-            s.setReuseAddress(true);
-            return s.getLocalPort();
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to discover available port.", e);
-        } finally {
-            try {
-                s.close();
-            } catch (IOException e) {
-                // ignore
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7c1ce172/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index fc84181..41a72c7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigInteger;
 import java.text.DateFormat;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -118,30 +117,26 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
     public Map<String, String> writeRecord(final Record record) throws IOException {
         // If we are not writing an active record set, then we need to ensure that we write the
         // schema information.
-        boolean firstRecord = false;
         if (!isActiveRecordSet()) {
             generator.flush();
             schemaAccess.writeHeader(recordSchema, getOutputStream());
-            firstRecord = true;
         }
 
         writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), true);
-        return firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap();
+        return schemaAccess.getAttributes(recordSchema);
     }
 
     @Override
     public WriteResult writeRawRecord(final Record record) throws IOException {
         // If we are not writing an active record set, then we need to ensure that we write the
         // schema information.
-        boolean firstRecord = false;
         if (!isActiveRecordSet()) {
             generator.flush();
             schemaAccess.writeHeader(recordSchema, getOutputStream());
-            firstRecord = true;
         }
 
         writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), false);
-        final Map<String, String> attributes = firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap();
+        final Map<String, String> attributes = schemaAccess.getAttributes(recordSchema);
         return WriteResult.of(incrementRecordCount(), attributes);
     }
 


Mime
View raw message