nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject [1/2] nifi git commit: NIFI-3238 - Introduce ListenBeats processor and deprecates ListenLumberjack processor
Date Thu, 02 Mar 2017 03:03:05 GMT
Repository: nifi
Updated Branches:
  refs/heads/master c3dbbb845 -> a32e1509b


http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java
new file mode 100644
index 0000000..5890771
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.response;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+
+/**
+ * Creates a BeatsFrame for the provided response and returns the encoded frame.
+ */
+public class BeatsChannelResponse implements ChannelResponse {
+
+    private final BeatsEncoder encoder;
+    private final BeatsResponse response;
+
+    public BeatsChannelResponse(final BeatsEncoder encoder, final BeatsResponse response) {
+        this.encoder = encoder;
+        this.response = response;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        final BeatsFrame frame = response.toFrame();
+        return encoder.encode(frame);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
new file mode 100644
index 0000000..8c51543
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.response;
+
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.ByteBuffer;
+
+/**
+ 'ack' frame type
+
+ SENT FROM READER ONLY
+ frame type value: ASCII 'A' aka byte value 0x41
+
+ Payload:
+ 32bit unsigned sequence number.
+
+ */
+public class BeatsResponse {
+    private final int seqNumber;
+    final private byte version = 0x32; // v2
+    final private byte frameType = 0x41; // A or ACK
+
+
+
+    public BeatsResponse(final int seqNumber) {
+        this.seqNumber = seqNumber;
+    }
+
+    /**
+     * Creates a BeatsFrame where the data portion will contain this response.
+     *
+     *
+     * @return a BeatsFrame for for this response
+     */
+    public BeatsFrame toFrame() {
+
+        return new BeatsFrame.Builder()
+                .version(version)
+                .frameType(frameType)
+                .payload(ByteBuffer.allocate(4).putInt(seqNumber).array())
+                .build();
+    }
+
+    public static BeatsResponse ok(final int seqNumber) {
+        return new BeatsResponse(seqNumber);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..378f132
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.beats.ListenBeats

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java
new file mode 100644
index 0000000..b0fe9c7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.beats.event;
+
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestBeatsEventFactory {
+
+    @Test
+    public void testCreateLumberJackEvent() {
+        final String sender = "testsender1";
+        final byte[] data = "this is a test line".getBytes();
+        final int seqNumber = 1;
+        final String fields = "{\"file\":\"test\"}";
+
+
+        final Map<String,String> metadata = new HashMap<>();
+        metadata.put(EventFactory.SENDER_KEY, sender);
+        metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(seqNumber));
+
+        final ChannelResponder responder = new SocketChannelResponder(null);
+
+        final EventFactory<BeatsEvent> factory = new BeatsEventFactory();
+
+        final BeatsEvent event = factory.create(data, metadata, responder);
+
+        Assert.assertEquals(sender, event.getSender());
+        Assert.assertEquals(seqNumber, event.getSeqNumber());
+        Assert.assertEquals(data, event.getData());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java
new file mode 100644
index 0000000..8778f84
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.frame;
+
+import java.nio.ByteBuffer;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestBeatsEncoder {
+    private BeatsEncoder encoder;
+
+
+    @Before
+    public void setup() {
+        this.encoder = new BeatsEncoder();
+    }
+
+    @Test
+    public void testEncode() {
+        BeatsFrame frame = new BeatsFrame.Builder()
+            .version((byte) 0x31)
+            .frameType((byte) 0x41)
+            .payload(ByteBuffer.allocate(4).putInt(123).array())
+            .build();
+
+        byte[] encoded = encoder.encode(frame);
+
+        Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("31410000007B"), encoded);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java
new file mode 100644
index 0000000..7225f8e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.frame;
+
+import org.junit.Test;
+
+
+public class TestBeatsFrame {
+
+    @Test(expected = BeatsFrameException.class)
+    public void testInvalidVersion() {
+        new BeatsFrame.Builder().seqNumber(1234).dataSize(3).build();
+    }
+
+    @Test(expected = BeatsFrameException.class)
+    public void testInvalidFrameType() {
+        new BeatsFrame.Builder().frameType((byte) 0x70).dataSize(5).build();
+    }
+
+    @Test(expected = BeatsFrameException.class)
+    public void testBlankFrameType() {
+        new BeatsFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java
new file mode 100644
index 0000000..ba0a9f2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.beats.event.BeatsEvent;
+import org.apache.nifi.processors.beats.event.BeatsEventFactory;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestBeatsFrameHandler {
+    private Charset charset;
+    private EventFactory<BeatsEvent> eventFactory;
+    private BlockingQueue<BeatsEvent> events;
+    private SelectionKey key;
+    private AsyncChannelDispatcher dispatcher;
+    private BeatsEncoder encoder;
+
+    private ComponentLog logger;
+
+    private BeatsFrameHandler<BeatsEvent> frameHandler;
+
+    @Before
+    public void setup() {
+        this.charset = StandardCharsets.UTF_8;
+        this.eventFactory = new BeatsEventFactory();
+        this.events = new LinkedBlockingQueue<>();
+        this.key = Mockito.mock(SelectionKey.class);
+        this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
+        this.logger = Mockito.mock(ComponentLog.class);
+
+        this.frameHandler = new BeatsFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
+    }
+
+    @Test
+    public void testWindow() throws IOException, InterruptedException {
+        final BeatsFrame openFrame = new BeatsFrame.Builder()
+            .version((byte) 0x31)
+            .frameType((byte) 0x57)
+            .seqNumber(-1)
+            .payload(Integer.toString(1).getBytes())
+            .build();
+
+
+        final String sender = "sender1";
+        final CapturingChannelResponder responder = new CapturingChannelResponder();
+
+        // call the handler and verify respond() was called once with once response
+        frameHandler.handle(openFrame, responder, sender);
+
+        // No response expected
+        Assert.assertEquals(0, responder.responded);
+    }
+
+    @Test
+    public void testJson() throws IOException, InterruptedException {
+        final byte jsonPayload[] = new byte[]{
+                // Payload eq { "message": "test-content", "field": "value"}
+                0x7b, 0x22, 0x6d, 0x65,
+                0x73, 0x73, 0x61, 0x67,
+                0x65, 0x22, 0x3a, 0x20,
+                0x22, 0x74, 0x65, 0x73,
+                0x74, 0x2d, 0x63, 0x6f,
+                0x6e, 0x74, 0x65, 0x6e,
+                0x74, 0x22, 0x2c, 0x20,
+                0x22, 0x66, 0x69, 0x65,
+                0x6c, 0x64, 0x22, 0x3a,
+                0x20, 0x22, 0x76, 0x61,
+                0x6c, 0x75, 0x65, 0x22,
+                0x7d
+        };
+
+        final BeatsFrame jsonFrame = new BeatsFrame.Builder()
+                .version((byte) 0x32)
+                .frameType((byte) 0x4a)
+                .seqNumber(1)
+                .dataSize(45)
+
+                .payload(jsonPayload)
+                .build();
+
+
+        final String sender = "sender1";
+        final CapturingChannelResponder responder = new CapturingChannelResponder();
+
+        // call the handler and verify respond() was called once with once response
+        frameHandler.handle(jsonFrame, responder, sender);
+
+        // No response expected
+        Assert.assertEquals(0, responder.responded);
+        // But events should contain one event
+        Assert.assertEquals(1, events.size());
+
+        final BeatsEvent event = events.poll();
+        Assert.assertEquals("{\"message\": \"test-content\", \"field\": \"value\"}", new String(event.getData(), charset));
+    }
+
+
+    private static class CapturingChannelResponder implements ChannelResponder<SocketChannel> {
+
+        int responded;
+        List<ChannelResponse> responses = new ArrayList<>();
+
+        @Override
+        public SocketChannel getChannel() {
+            return Mockito.mock(SocketChannel.class);
+        }
+
+        @Override
+        public List<ChannelResponse> getResponses() {
+            return responses;
+        }
+
+        @Override
+        public void addResponse(ChannelResponse response) {
+            responses.add(response);
+        }
+
+        @Override
+        public void respond() throws IOException {
+            responded++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
new file mode 100644
index 0000000..44f73a2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.net.ssl.SSLContext;
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processors.beats.event.BeatsMetadata;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+
+
+public class TestBeatsSocketChannelHandler {
+    private EventFactory<TestEvent> eventFactory;
+    private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
+    private BlockingQueue<ByteBuffer> byteBuffers;
+    private BlockingQueue<TestEvent> events;
+    private ComponentLog logger = Mockito.mock(ComponentLog.class);
+    private int maxConnections;
+    private SSLContext sslContext;
+    private Charset charset;
+    private ChannelDispatcher dispatcher;
+
+    @Before
+    public void setup() {
+        eventFactory = new TestEventHolderFactory();
+        channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>();
+
+        byteBuffers = new LinkedBlockingQueue<>();
+        byteBuffers.add(ByteBuffer.allocate(4096));
+
+        events = new LinkedBlockingQueue<>();
+        logger = Mockito.mock(ComponentLog.class);
+
+        maxConnections = 1;
+        sslContext = null;
+        charset = StandardCharsets.UTF_8;
+
+        dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
+                maxConnections, sslContext, charset);
+
+    }
+
+    @Test
+    public void testWiredJsonHandling() throws IOException, InterruptedException {
+        final String singleJsonFrame = "324a000000010000002d7b226d657373616765223a2022746573742d636f6e74656e7422" +
+                "2c20226669656c64223a202276616c7565227d";
+        final List<String> messages = new ArrayList<>();
+        messages.add(singleJsonFrame);
+
+        run(messages);
+
+        // Check for the 1 frames (from the hex string above) are back...
+        Assert.assertEquals(1, events.size());
+
+        TestEvent event;
+        while((event = events.poll()) != null) {
+            Map<String, String> metadata = event.metadata;
+            Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY));
+
+            final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY);
+            final String line = new String(event.getData());
+            Assert.assertTrue(seqNum.equals("1"));
+            Assert.assertEquals("{\"message\": \"test-content\", \"field\": \"value\"}", line);
+        }
+    }
+
+    @Test
+    public void testCompressedJsonHandling() throws IOException, InterruptedException {
+        final String multipleJsonFrame = "3243000000E27801CC91414BC3401085477FCA3B6F" +
+                "93EEB6A5B8A71E3CF5ECC98BECC6491AC86643665290903FAE17A982540F8237E7F" +
+                "80D3C78EF734722BA21A2B71987C41A9E8306F819FA32303CBADCC020725078D46D" +
+                "C791836231D0EB7FDB0F933EE9354A2C129A4B44F8B8AF94197D4817CE7CCF67189" +
+                "CB2E80F74E651DADCC36357D8C2623138689B5834A4011E6E6DF7ABB55DAD770F76" +
+                "E3B7777EBB299CB58F30903C8D15C3A33CE5C465A8A74ACA2E3792A7B1E25259B4E" +
+                "87203835CD7C20ABF5FDC91886E89E8F58F237CEEF2EF1A5967BEFBFBD54F8C3162" +
+                "790F0000FFFF6CB6A08D";
+
+        final List<String> messages = new ArrayList<>();
+        messages.add(multipleJsonFrame);
+
+        run(messages);
+
+        // Check for the 2 frames (from the hex string above) are back...
+        Assert.assertEquals(2, events.size());
+
+        boolean found1 = false;
+        boolean found2 = false;
+
+
+        TestEvent event;
+        while((event = events.poll()) != null) {
+            Map<String, String> metadata = event.metadata;
+            Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY));
+
+            final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY);
+            final String line = new String(event.getData());
+
+            if (seqNum.equals("1") && line.contains("\"message\":\"aaaaaa\"")) {
+                found1 = true;
+            }
+            if (seqNum.equals("2") && line.contains("\"message\":\"bbbb\"")) {
+                found2 = true;
+            }
+        }
+        Assert.assertTrue(found1 && found2);
+    }
+
+
+    protected void run(List<String> messages) throws IOException, InterruptedException {
+        final ByteBuffer buffer = ByteBuffer.allocate(1024);
+        try {
+            // starts the dispatcher listening on port 0 so it selects a random port
+            dispatcher.open(null, 0, 4096);
+
+            // starts a thread to run the dispatcher which will accept/read connections
+            Thread dispatcherThread = new Thread(dispatcher);
+            dispatcherThread.start();
+
+
+            // create a client connection to the port the dispatcher is listening on
+            final int realPort = dispatcher.getPort();
+            try (SocketChannel channel = SocketChannel.open()) {
+                channel.connect(new InetSocketAddress("localhost", realPort));
+                Thread.sleep(100);
+
+                // send the provided messages
+                for (int i=0; i < messages.size(); i++) {
+                    buffer.clear();
+                    buffer.put(DatatypeConverter.parseHexBinary(messages.get(i)));
+                    buffer.flip();
+
+                    while (buffer.hasRemaining()) {
+                        channel.write(buffer);
+                    }
+                    Thread.sleep(1);
+                }
+            }
+
+            // wait up to 10 seconds to verify the responses
+            long timeout = 10000;
+            long startTime = System.currentTimeMillis();
+            while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
+                Thread.sleep(100);
+            }
+
+        } finally {
+            // stop the dispatcher thread and ensure we shut down handler threads
+            dispatcher.close();
+        }
+    }
+
+    // Test event to produce from the data
+    private static class TestEvent implements Event<SocketChannel> {
+
+        private byte[] data;
+        private Map<String, String> metadata;
+
+        public TestEvent(byte[] data, Map<String, String> metadata) {
+            this.data = data;
+            this.metadata = metadata;
+        }
+
+        @Override
+        public String getSender() {
+            return metadata.get(EventFactory.SENDER_KEY);
+        }
+
+        @Override
+        public byte[] getData() {
+            return data;
+        }
+
+        @Override
+        public ChannelResponder<SocketChannel> getResponder() {
+            return null;
+        }
+    }
+
+    // Factory to create test events and send responses for testing
+    private static class TestEventHolderFactory implements EventFactory<TestEvent> {
+
+        @Override
+        public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
+            return new TestEvent(data, metadata);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-beats-bundle/pom.xml b/nifi-nar-bundles/nifi-beats-bundle/pom.xml
new file mode 100644
index 0000000..a3330d9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-beats-bundle</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-beats-processors</module>
+        <module>nifi-beats-nar</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
index 2716cfe..f209c03 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
@@ -62,12 +62,13 @@ import org.apache.nifi.ssl.SSLContextService;
 
 import com.google.gson.Gson;
 
+@Deprecated
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"listen", "lumberjack", "tcp", "logs"})
-@CapabilityDescription("Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " +
+@CapabilityDescription("This processor is deprecated and may be removed in the near future. Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " +
     "acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data " +
     "portion of one or more Lumberjack frames. In the case where the Lumberjack frames contain syslog messages, the " +
-    "output of this processor can be sent to a ParseSyslog processor for further processing.")
+    "output of this processor can be sent to a ParseSyslog processor for further processing. ")
 @WritesAttributes({
     @WritesAttribute(attribute = "lumberjack.sender", description = "The sending host of the messages."),
     @WritesAttribute(attribute = "lumberjack.port", description = "The sending port the messages were received over."),
@@ -76,6 +77,9 @@ import com.google.gson.Gson;
     @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is text/plain")
 })
 @SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
+/**
+ * @deprecated  As of release 1.2.0, replaced by {@link org.apache.nifi.processors.beats.ListenBeats}
+ * */
 public class ListenLumberjack extends AbstractListenEventBatchingProcessor<LumberjackEvent> {
 
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java
index c4973bc..c3ddff6 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java
@@ -24,6 +24,7 @@ import java.nio.channels.SocketChannel;
 /**
  * A Lumberjack event which adds the transaction number and command to the StandardEvent.
  */
+@Deprecated
 public class LumberjackEvent extends StandardEvent<SocketChannel> {
 
     private final long seqNumber;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java
index cdb2be2..3a08c2a 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
 /**
  * An EventFactory implementation to create LumberjackEvents.
  */
+@Deprecated
 public class LumberjackEventFactory implements EventFactory<LumberjackEvent> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java
index 1233c27..a2fa452 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.lumberjack.event;
 /**
  * Metadata keys for Lumberjack.
  */
+@Deprecated
 public interface LumberjackMetadata {
 
     String SEQNUMBER_KEY = "lumberjack.sequencenumber";

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java
index 9bc9468..7c2f504 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
  * Decodes a Lumberjack frame by maintaining a state based on each byte that has been processed. This class
  * should not be shared by multiple threads.
  */
+@Deprecated
 public class LumberjackDecoder {
 
     static final Logger logger = LoggerFactory.getLogger(LumberjackDecoder.class);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java
index e15ec55..cb8a683 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 /**
  * Encodes a LumberjackFrame into raw bytes using the given charset.
  */
+@Deprecated
 public class LumberjackEncoder {
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java
index 7fedf38..809c679 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.lumberjack.frame;
 /**
  * A Lumberjack frame received from a channel.
  */
+@Deprecated
 public class LumberjackFrame {
 
     public static final byte DELIMITER = 10;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java
index 61392ff..a388a5b 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.lumberjack.frame;
 /**
  * Represents an error encountered when decoding Lumberjack frames.
  */
+@Deprecated
 public class LumberjackFrameException extends RuntimeException {
 
     public LumberjackFrameException(String message) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java
index d3f6dd4..3e3e70c 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.lumberjack.frame;
 /**
  * The parts of a Lumberjack frame.
  */
+@Deprecated
 public enum LumberjackState {
 
     VERSION,

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java
index 40c41ce..a70c4f3 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java
@@ -42,6 +42,7 @@ import com.google.gson.Gson;
 /**
  * Encapsulates the logic to handle a LumberjackFrame once it has been read from the channel.
  */
+@Deprecated
 public class LumberjackFrameHandler<E extends Event<SocketChannel>> {
 
     private final Charset charset;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java
index 7a6fdf1..cdfe8d9 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java
@@ -38,6 +38,7 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
 /**
  * A Lumberjack implementation of SSLSocketChannelHandler.
  */
+@Deprecated
 public class LumberjackSSLSocketChannelHandler<E extends Event<SocketChannel>> extends SSLSocketChannelHandler<E> {
 
     private LumberjackDecoder decoder;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java
index 2621222..73fd97f 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java
@@ -38,6 +38,7 @@ import org.apache.nifi.processors.lumberjack.frame.LumberjackFrameException;
 /**
  * Extends the StandardSocketChannelHandler to decode bytes into Lumberjack frames.
  */
+@Deprecated
 public class LumberjackSocketChannelHandler<E extends Event<SocketChannel>> extends StandardSocketChannelHandler<E> {
 
     private LumberjackDecoder decoder;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java
index be34b92..fe287f8 100644
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java
@@ -31,6 +31,7 @@ import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
 /**
  * Default factory for creating Lumberjack socket channel handlers.
  */
+@Deprecated
 public class LumberjackSocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 02785d5..c1016e7 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -58,6 +58,7 @@
 	<module>nifi-splunk-bundle</module>
         <module>nifi-jms-bundle</module>
         <module>nifi-lumberjack-bundle</module>
+        <module>nifi-beats-bundle</module>
         <module>nifi-cassandra-bundle</module>
         <module>nifi-spring-bundle</module>
         <module>nifi-hive-bundle</module>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a07e7ff..cec0e67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1224,6 +1224,12 @@ language governing permissions and limitations under the License. -->
             </dependency>
              <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-beats-nar</artifactId>
+                <version>1.2.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+             <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-email-nar</artifactId>
                 <version>1.2.0-SNAPSHOT</version>
                 <type>nar</type>


Mime
View raw message