Repository: nifi Updated Branches: refs/heads/master c3dbbb845 -> a32e1509b http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java new file mode 100644 index 0000000..5890771 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.response; + +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.processors.beats.frame.BeatsFrame; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; + +/** + * Creates a BeatsFrame for the provided response and returns the encoded frame. + */ +public class BeatsChannelResponse implements ChannelResponse { + + private final BeatsEncoder encoder; + private final BeatsResponse response; + + public BeatsChannelResponse(final BeatsEncoder encoder, final BeatsResponse response) { + this.encoder = encoder; + this.response = response; + } + + @Override + public byte[] toByteArray() { + final BeatsFrame frame = response.toFrame(); + return encoder.encode(frame); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java new file mode 100644 index 0000000..8c51543 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.response; + +import org.apache.nifi.processors.beats.frame.BeatsFrame; + +import java.nio.ByteBuffer; + +/** + 'ack' frame type + + SENT FROM READER ONLY + frame type value: ASCII 'A' aka byte value 0x41 + + Payload: + 32bit unsigned sequence number. + + */ +public class BeatsResponse { + private final int seqNumber; + final private byte version = 0x32; // v2 + final private byte frameType = 0x41; // A or ACK + + + + public BeatsResponse(final int seqNumber) { + this.seqNumber = seqNumber; + } + + /** + * Creates a BeatsFrame where the data portion will contain this response. + * + * + * @return a BeatsFrame for for this response + */ + public BeatsFrame toFrame() { + + return new BeatsFrame.Builder() + .version(version) + .frameType(frameType) + .payload(ByteBuffer.allocate(4).putInt(seqNumber).array()) + .build(); + } + + public static BeatsResponse ok(final int seqNumber) { + return new BeatsResponse(seqNumber); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..378f132 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.beats.ListenBeats http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java new file mode 100644 index 0000000..b0fe9c7 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.beats.event; + +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; +import org.junit.Assert; +import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + +public class TestBeatsEventFactory { + + @Test + public void testCreateLumberJackEvent() { + final String sender = "testsender1"; + final byte[] data = "this is a test line".getBytes(); + final int seqNumber = 1; + final String fields = "{\"file\":\"test\"}"; + + + final Map metadata = new HashMap<>(); + metadata.put(EventFactory.SENDER_KEY, sender); + metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(seqNumber)); + + final ChannelResponder responder = new SocketChannelResponder(null); + + final EventFactory factory = new BeatsEventFactory(); + + final BeatsEvent event = factory.create(data, metadata, responder); + + Assert.assertEquals(sender, event.getSender()); + Assert.assertEquals(seqNumber, event.getSeqNumber()); + Assert.assertEquals(data, event.getData()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java new file mode 100644 index 0000000..8778f84 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.frame; + +import java.nio.ByteBuffer; + +import javax.xml.bind.DatatypeConverter; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class TestBeatsEncoder { + private BeatsEncoder encoder; + + + @Before + public void setup() { + this.encoder = new BeatsEncoder(); + } + + @Test + public void testEncode() { + BeatsFrame frame = new BeatsFrame.Builder() + .version((byte) 0x31) + .frameType((byte) 0x41) + .payload(ByteBuffer.allocate(4).putInt(123).array()) + .build(); + + byte[] encoded = encoder.encode(frame); + + Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("31410000007B"), encoded); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java new file mode 100644 index 0000000..7225f8e --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.frame; + +import org.junit.Test; + + +public class TestBeatsFrame { + + @Test(expected = BeatsFrameException.class) + public void testInvalidVersion() { + new BeatsFrame.Builder().seqNumber(1234).dataSize(3).build(); + } + + @Test(expected = BeatsFrameException.class) + public void testInvalidFrameType() { + new BeatsFrame.Builder().frameType((byte) 0x70).dataSize(5).build(); + } + + @Test(expected = BeatsFrameException.class) + public void testBlankFrameType() { + new BeatsFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java new file mode 100644 index 0000000..ba0a9f2 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.handler; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.processors.beats.event.BeatsEvent; +import org.apache.nifi.processors.beats.event.BeatsEventFactory; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.frame.BeatsFrame; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + + +public class TestBeatsFrameHandler { + private Charset charset; + private EventFactory eventFactory; + private BlockingQueue events; + private SelectionKey key; + private AsyncChannelDispatcher dispatcher; + private BeatsEncoder encoder; + + private ComponentLog logger; + + private BeatsFrameHandler frameHandler; + + @Before + public void setup() { + this.charset = StandardCharsets.UTF_8; + this.eventFactory = new BeatsEventFactory(); + this.events = new LinkedBlockingQueue<>(); + this.key = Mockito.mock(SelectionKey.class); + this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class); + this.logger = Mockito.mock(ComponentLog.class); + + this.frameHandler = new BeatsFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); + } + + @Test + public void testWindow() throws IOException, InterruptedException { + final BeatsFrame openFrame = new BeatsFrame.Builder() + .version((byte) 0x31) + .frameType((byte) 0x57) + .seqNumber(-1) + .payload(Integer.toString(1).getBytes()) + .build(); + + + final String sender = "sender1"; + final CapturingChannelResponder responder = new CapturingChannelResponder(); + + // call the handler and verify respond() was called once with once response + frameHandler.handle(openFrame, responder, sender); + + // No response expected + Assert.assertEquals(0, responder.responded); + } + + @Test + public void testJson() throws IOException, InterruptedException { + final byte jsonPayload[] = new byte[]{ + // Payload eq { "message": "test-content", "field": "value"} + 0x7b, 0x22, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, + 0x65, 0x22, 0x3a, 0x20, + 0x22, 0x74, 0x65, 0x73, + 0x74, 0x2d, 0x63, 0x6f, + 0x6e, 0x74, 0x65, 0x6e, + 0x74, 0x22, 0x2c, 0x20, + 0x22, 0x66, 0x69, 0x65, + 0x6c, 0x64, 0x22, 0x3a, + 0x20, 0x22, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x22, + 0x7d + }; + + final BeatsFrame jsonFrame = new BeatsFrame.Builder() + .version((byte) 0x32) + .frameType((byte) 0x4a) + .seqNumber(1) + .dataSize(45) + + .payload(jsonPayload) + .build(); + + + final String sender = "sender1"; + final CapturingChannelResponder responder = new CapturingChannelResponder(); + + // call the handler and verify respond() was called once with once response + frameHandler.handle(jsonFrame, responder, sender); + + // No response expected + Assert.assertEquals(0, responder.responded); + // But events should contain one event + Assert.assertEquals(1, events.size()); + + final BeatsEvent event = events.poll(); + Assert.assertEquals("{\"message\": \"test-content\", \"field\": \"value\"}", new String(event.getData(), charset)); + } + + + private static class CapturingChannelResponder implements ChannelResponder { + + int responded; + List responses = new ArrayList<>(); + + @Override + public SocketChannel getChannel() { + return Mockito.mock(SocketChannel.class); + } + + @Override + public List getResponses() { + return responses; + } + + @Override + public void addResponse(ChannelResponse response) { + responses.add(response); + } + + @Override + public void respond() throws IOException { + responded++; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java new file mode 100644 index 0000000..44f73a2 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.handler; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.net.ssl.SSLContext; +import javax.xml.bind.DatatypeConverter; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processors.beats.event.BeatsMetadata; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + + + + +public class TestBeatsSocketChannelHandler { + private EventFactory eventFactory; + private ChannelHandlerFactory channelHandlerFactory; + private BlockingQueue byteBuffers; + private BlockingQueue events; + private ComponentLog logger = Mockito.mock(ComponentLog.class); + private int maxConnections; + private SSLContext sslContext; + private Charset charset; + private ChannelDispatcher dispatcher; + + @Before + public void setup() { + eventFactory = new TestEventHolderFactory(); + channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>(); + + byteBuffers = new LinkedBlockingQueue<>(); + byteBuffers.add(ByteBuffer.allocate(4096)); + + events = new LinkedBlockingQueue<>(); + logger = Mockito.mock(ComponentLog.class); + + maxConnections = 1; + sslContext = null; + charset = StandardCharsets.UTF_8; + + dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger, + maxConnections, sslContext, charset); + + } + + @Test + public void testWiredJsonHandling() throws IOException, InterruptedException { + final String singleJsonFrame = "324a000000010000002d7b226d657373616765223a2022746573742d636f6e74656e7422" + + "2c20226669656c64223a202276616c7565227d"; + final List messages = new ArrayList<>(); + messages.add(singleJsonFrame); + + run(messages); + + // Check for the 1 frames (from the hex string above) are back... + Assert.assertEquals(1, events.size()); + + TestEvent event; + while((event = events.poll()) != null) { + Map metadata = event.metadata; + Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY)); + + final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY); + final String line = new String(event.getData()); + Assert.assertTrue(seqNum.equals("1")); + Assert.assertEquals("{\"message\": \"test-content\", \"field\": \"value\"}", line); + } + } + + @Test + public void testCompressedJsonHandling() throws IOException, InterruptedException { + final String multipleJsonFrame = "3243000000E27801CC91414BC3401085477FCA3B6F" + + "93EEB6A5B8A71E3CF5ECC98BECC6491AC86643665290903FAE17A982540F8237E7F" + + "80D3C78EF734722BA21A2B71987C41A9E8306F819FA32303CBADCC020725078D46D" + + "C791836231D0EB7FDB0F933EE9354A2C129A4B44F8B8AF94197D4817CE7CCF67189" + + "CB2E80F74E651DADCC36357D8C2623138689B5834A4011E6E6DF7ABB55DAD770F76" + + "E3B7777EBB299CB58F30903C8D15C3A33CE5C465A8A74ACA2E3792A7B1E25259B4E" + + "87203835CD7C20ABF5FDC91886E89E8F58F237CEEF2EF1A5967BEFBFBD54F8C3162" + + "790F0000FFFF6CB6A08D"; + + final List messages = new ArrayList<>(); + messages.add(multipleJsonFrame); + + run(messages); + + // Check for the 2 frames (from the hex string above) are back... + Assert.assertEquals(2, events.size()); + + boolean found1 = false; + boolean found2 = false; + + + TestEvent event; + while((event = events.poll()) != null) { + Map metadata = event.metadata; + Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY)); + + final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY); + final String line = new String(event.getData()); + + if (seqNum.equals("1") && line.contains("\"message\":\"aaaaaa\"")) { + found1 = true; + } + if (seqNum.equals("2") && line.contains("\"message\":\"bbbb\"")) { + found2 = true; + } + } + Assert.assertTrue(found1 && found2); + } + + + protected void run(List messages) throws IOException, InterruptedException { + final ByteBuffer buffer = ByteBuffer.allocate(1024); + try { + // starts the dispatcher listening on port 0 so it selects a random port + dispatcher.open(null, 0, 4096); + + // starts a thread to run the dispatcher which will accept/read connections + Thread dispatcherThread = new Thread(dispatcher); + dispatcherThread.start(); + + + // create a client connection to the port the dispatcher is listening on + final int realPort = dispatcher.getPort(); + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", realPort)); + Thread.sleep(100); + + // send the provided messages + for (int i=0; i < messages.size(); i++) { + buffer.clear(); + buffer.put(DatatypeConverter.parseHexBinary(messages.get(i))); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(1); + } + } + + // wait up to 10 seconds to verify the responses + long timeout = 10000; + long startTime = System.currentTimeMillis(); + while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) { + Thread.sleep(100); + } + + } finally { + // stop the dispatcher thread and ensure we shut down handler threads + dispatcher.close(); + } + } + + // Test event to produce from the data + private static class TestEvent implements Event { + + private byte[] data; + private Map metadata; + + public TestEvent(byte[] data, Map metadata) { + this.data = data; + this.metadata = metadata; + } + + @Override + public String getSender() { + return metadata.get(EventFactory.SENDER_KEY); + } + + @Override + public byte[] getData() { + return data; + } + + @Override + public ChannelResponder getResponder() { + return null; + } + } + + // Factory to create test events and send responses for testing + private static class TestEventHolderFactory implements EventFactory { + + @Override + public TestEvent create(final byte[] data, final Map metadata, final ChannelResponder responder) { + return new TestEvent(data, metadata); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-beats-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-beats-bundle/pom.xml b/nifi-nar-bundles/nifi-beats-bundle/pom.xml new file mode 100644 index 0000000..a3330d9 --- /dev/null +++ b/nifi-nar-bundles/nifi-beats-bundle/pom.xml @@ -0,0 +1,34 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.2.0-SNAPSHOT + + + nifi-beats-bundle + 1.2.0-SNAPSHOT + pom + + + nifi-beats-processors + nifi-beats-nar + + + http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java index 2716cfe..f209c03 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java @@ -62,12 +62,13 @@ import org.apache.nifi.ssl.SSLContextService; import com.google.gson.Gson; +@Deprecated @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"listen", "lumberjack", "tcp", "logs"}) -@CapabilityDescription("Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " + +@CapabilityDescription("This processor is deprecated and may be removed in the near future. Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " + "acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data " + "portion of one or more Lumberjack frames. In the case where the Lumberjack frames contain syslog messages, the " + - "output of this processor can be sent to a ParseSyslog processor for further processing.") + "output of this processor can be sent to a ParseSyslog processor for further processing. ") @WritesAttributes({ @WritesAttribute(attribute = "lumberjack.sender", description = "The sending host of the messages."), @WritesAttribute(attribute = "lumberjack.port", description = "The sending port the messages were received over."), @@ -76,6 +77,9 @@ import com.google.gson.Gson; @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is text/plain") }) @SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"}) +/** + * @deprecated As of release 1.2.0, replaced by {@link org.apache.nifi.processors.beats.ListenBeats} + * */ public class ListenLumberjack extends AbstractListenEventBatchingProcessor { public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java index c4973bc..c3ddff6 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEvent.java @@ -24,6 +24,7 @@ import java.nio.channels.SocketChannel; /** * A Lumberjack event which adds the transaction number and command to the StandardEvent. */ +@Deprecated public class LumberjackEvent extends StandardEvent { private final long seqNumber; http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java index cdb2be2..3a08c2a 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackEventFactory.java @@ -24,6 +24,7 @@ import java.util.Map; /** * An EventFactory implementation to create LumberjackEvents. */ +@Deprecated public class LumberjackEventFactory implements EventFactory { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java index 1233c27..a2fa452 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/event/LumberjackMetadata.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.lumberjack.event; /** * Metadata keys for Lumberjack. */ +@Deprecated public interface LumberjackMetadata { String SEQNUMBER_KEY = "lumberjack.sequencenumber"; http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java index 9bc9468..7c2f504 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackDecoder.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; * Decodes a Lumberjack frame by maintaining a state based on each byte that has been processed. This class * should not be shared by multiple threads. */ +@Deprecated public class LumberjackDecoder { static final Logger logger = LoggerFactory.getLogger(LumberjackDecoder.class); http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java index e15ec55..cb8a683 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackEncoder.java @@ -22,6 +22,7 @@ import java.io.IOException; /** * Encodes a LumberjackFrame into raw bytes using the given charset. */ +@Deprecated public class LumberjackEncoder { http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java index 7fedf38..809c679 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrame.java @@ -20,6 +20,7 @@ package org.apache.nifi.processors.lumberjack.frame; /** * A Lumberjack frame received from a channel. */ +@Deprecated public class LumberjackFrame { public static final byte DELIMITER = 10; http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java index 61392ff..a388a5b 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackFrameException.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.lumberjack.frame; /** * Represents an error encountered when decoding Lumberjack frames. */ +@Deprecated public class LumberjackFrameException extends RuntimeException { public LumberjackFrameException(String message) { http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java index d3f6dd4..3e3e70c 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/frame/LumberjackState.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.lumberjack.frame; /** * The parts of a Lumberjack frame. */ +@Deprecated public enum LumberjackState { VERSION, http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java index 40c41ce..a70c4f3 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackFrameHandler.java @@ -42,6 +42,7 @@ import com.google.gson.Gson; /** * Encapsulates the logic to handle a LumberjackFrame once it has been read from the channel. */ +@Deprecated public class LumberjackFrameHandler> { private final Charset charset; http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java index 7a6fdf1..cdfe8d9 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSSLSocketChannelHandler.java @@ -38,6 +38,7 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; /** * A Lumberjack implementation of SSLSocketChannelHandler. */ +@Deprecated public class LumberjackSSLSocketChannelHandler> extends SSLSocketChannelHandler { private LumberjackDecoder decoder; http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java index 2621222..73fd97f 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandler.java @@ -38,6 +38,7 @@ import org.apache.nifi.processors.lumberjack.frame.LumberjackFrameException; /** * Extends the StandardSocketChannelHandler to decode bytes into Lumberjack frames. */ +@Deprecated public class LumberjackSocketChannelHandler> extends StandardSocketChannelHandler { private LumberjackDecoder decoder; http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java index be34b92..fe287f8 100644 --- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/handler/LumberjackSocketChannelHandlerFactory.java @@ -31,6 +31,7 @@ import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; /** * Default factory for creating Lumberjack socket channel handlers. */ +@Deprecated public class LumberjackSocketChannelHandlerFactory> implements ChannelHandlerFactory { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 02785d5..c1016e7 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -58,6 +58,7 @@ nifi-splunk-bundle nifi-jms-bundle nifi-lumberjack-bundle + nifi-beats-bundle nifi-cassandra-bundle nifi-spring-bundle nifi-hive-bundle http://git-wip-us.apache.org/repos/asf/nifi/blob/a32e1509/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a07e7ff..cec0e67 100644 --- a/pom.xml +++ b/pom.xml @@ -1224,6 +1224,12 @@ language governing permissions and limitations under the License. --> org.apache.nifi + nifi-beats-nar + 1.2.0-SNAPSHOT + nar + + + org.apache.nifi nifi-email-nar 1.2.0-SNAPSHOT nar