nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [1/2] incubator-nifi git commit: NIFI-377: Add EncodeContent processor
Date Mon, 02 Mar 2015 15:07:38 GMT
Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 099865a1b -> fff37591a


NIFI-377: Add EncodeContent processor


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b2935410
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b2935410
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b2935410

Branch: refs/heads/develop
Commit: b2935410a2bcae73848e817578f2e777df517a45
Parents: 50744bf
Author: Adam Lamar <adamonduty@gmail.com>
Authored: Fri Feb 20 15:50:46 2015 +0000
Committer: Adam Lamar <adamonduty@gmail.com>
Committed: Sat Feb 28 20:28:37 2015 +0000

----------------------------------------------------------------------
 .../nifi/processors/standard/EncodeContent.java | 248 +++++++++++++++++++
 .../util/ValidatingBase32InputStream.java       |  78 ++++++
 .../processors/standard/TestEncodeContent.java  | 164 ++++++++++++
 3 files changed, 490 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2935410/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
new file mode 100644
index 0000000..2465b56
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Base32InputStream;
+import org.apache.commons.codec.binary.Base32OutputStream;
+
+import org.apache.commons.codec.binary.Base64InputStream;
+import org.apache.commons.codec.binary.Base64OutputStream;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processors.standard.util.ValidatingBase32InputStream;
+import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"experimental", "encode", "decode", "base64", "hex"})
+@CapabilityDescription("Encodes the FlowFile content in base64")
+public class EncodeContent extends AbstractProcessor {
+
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
+
+    // List of support encodings.
+    public static final String BASE64_ENCODING = "base64";
+    public static final String BASE32_ENCODING = "base32";
+    public static final String HEX_ENCODING    = "hex";
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("Mode")
+            .description("Specifies whether the content should be encoded or decoded")
+            .required(true)
+            .allowableValues(ENCODE_MODE, DECODE_MODE)
+            .defaultValue(ENCODE_MODE)
+            .build();
+
+    public static final PropertyDescriptor ENCODING = new PropertyDescriptor.Builder()
+            .name("Encoding")
+            .description("Specifies the type of encoding used")
+            .required(true)
+            .allowableValues(BASE64_ENCODING, BASE32_ENCODING, HEX_ENCODING)
+            .defaultValue(BASE64_ENCODING)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any
FlowFile that is successfully encoded or decoded will be routed to success").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any
FlowFile that cannot be encoded or decoded will be routed to failure").build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(MODE);
+        props.add(ENCODING);
+        this.properties = Collections.unmodifiableList(props);
+
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(rels);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ProcessorLog logger = getLogger();
+
+        boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        String encoding = context.getProperty(ENCODING).getValue();
+        StreamCallback encoder = null;
+
+        // Select the encoder/decoder to use
+        if (encode) {
+            if (encoding.equalsIgnoreCase(BASE64_ENCODING)) {
+                encoder = new EncodeBase64();
+            } else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) {
+                encoder = new EncodeBase32();
+            } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) {
+                encoder = new EncodeHex();
+            }
+        } else {
+            if (encoding.equalsIgnoreCase(BASE64_ENCODING)) {
+                encoder = new DecodeBase64();
+            } else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) {
+                encoder = new DecodeBase32();
+            } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) {
+                encoder = new DecodeHex();
+            }
+        }
+
+        if (encoder == null) {
+            logger.warn("Unknown operation: {} {}", new Object[]{encode ? "encode" : "decode",
encoding});
+            return;
+        }
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            flowFile = session.write(flowFile, encoder);
+
+            logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded",
flowFile});
+            session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode",
flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    private class EncodeBase64 implements StreamCallback {
+        @Override
+        public void process(InputStream in, OutputStream out) throws IOException {
+            try (Base64OutputStream bos = new Base64OutputStream(out)) {
+                StreamUtils.copy(in, bos);
+            }
+        }
+    }
+
+    private class DecodeBase64 implements StreamCallback {
+        @Override
+        public void process(InputStream in, OutputStream out) throws IOException {
+            try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in)))
{
+                StreamUtils.copy(bis, out);
+            }
+        }
+    }
+
+    private class EncodeBase32 implements StreamCallback {
+        @Override
+        public void process(InputStream in, OutputStream out) throws IOException {
+            try (Base32OutputStream bos = new Base32OutputStream(out)) {
+                StreamUtils.copy(in, bos);
+            }
+        }
+    }
+
+    private class DecodeBase32 implements StreamCallback {
+        @Override
+        public void process(InputStream in, OutputStream out) throws IOException {
+            try (Base32InputStream bis = new Base32InputStream(new ValidatingBase32InputStream(in)))
{
+                StreamUtils.copy(bis, out);
+            }
+        }
+    }
+
+    private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7',
+            '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+
+    private class EncodeHex implements StreamCallback {
+        @Override
+        public void process(InputStream in, OutputStream out) throws IOException {
+            int len;
+            byte[] inBuf = new byte[8192];
+            byte[] outBuf = new byte[inBuf.length * 2];
+            while ((len = in.read(inBuf)) > 0) {
+                for (int i = 0; i < len; i++) {
+                    outBuf[i*2] = HEX_CHARS[(inBuf[i] & 0xF0) >>> 4];
+                    outBuf[i*2 +1] = HEX_CHARS[inBuf[i] & 0x0F];
+                }
+                out.write(outBuf, 0, len*2);
+            }
+            out.flush();
+        }
+    }
+
+    private class DecodeHex implements StreamCallback {
+        @Override
+        public void process(InputStream in, OutputStream out) throws IOException {
+            int len;
+            byte[] inBuf = new byte[8192];
+            Hex h = new Hex();
+            while ((len = in.read(inBuf)) > 0) {
+                // If the input buffer is of odd length, try to get another byte
+                if (len % 2 != 0) {
+                    int b = in.read();
+                    if (b != -1) {
+                        inBuf[len] = (byte) b;
+                        len++;
+                    }
+                }
+
+                // Construct a new buffer bounded to len
+                byte[] slice = Arrays.copyOfRange(inBuf, 0, len);
+                try {
+                    out.write(h.decode(slice));
+                } catch (DecoderException ex) {
+                    throw new IOException(ex);
+                }
+            }
+            out.flush();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2935410/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java
new file mode 100644
index 0000000..f7ebeab
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import org.apache.commons.codec.binary.Base32;
+
+
+/**
+ * An InputStream that throws an IOException if any byte is read that is not a
+ * valid Base32 character. Whitespace is considered valid.
+ */
+public class ValidatingBase32InputStream extends FilterInputStream {
+
+    private final Base32 b32 = new Base32();
+
+    public ValidatingBase32InputStream(InputStream in) {
+        super(in);
+    }
+
+    @Override
+    public int read(byte[] b, int offset, int len) throws IOException {
+        int numRead = super.read(b, offset, len);
+        if (numRead > 0) {
+            byte[] copy = b;
+            if (numRead < b.length) {
+                // isBase32 checks the whole length of byte[], we need to limit it to numRead
+                copy = Arrays.copyOf(b, numRead);
+            }
+            if (!b32.isInAlphabet(copy, true)) {
+                throw new IOException("Data is not base32 encoded.");
+            }
+        }
+        return numRead;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        int numRead = super.read(b);
+        if (numRead > 0) {
+            byte[] copy = b;
+            if (numRead < b.length) {
+                // isBase32 checks the whole length of byte[], we need to limit it to numRead
+                copy = Arrays.copyOf(b, numRead);
+            }
+            if (!b32.isInAlphabet(copy, true)) {
+                throw new IOException("Data is not base32 encoded.");
+            }
+        }
+        return numRead;
+    }
+
+    @Override
+    public int read() throws IOException {
+        int data = super.read();
+        if (!b32.isInAlphabet((byte) data)) {
+            throw new IOException("Data is not base32 encoded.");
+        }
+        return super.read();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2935410/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
new file mode 100644
index 0000000..fec411d
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import org.junit.Test;
+
+public class TestEncodeContent {
+
+    @Test
+    public void testBase64RoundTrip() throws IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
+        testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING);
+
+        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.clearTransferState();
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
+        testRunner.assertQueueEmpty();
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
+        testRunner.enqueue(flowFile);
+        testRunner.clearTransferState();
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
+
+        flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
+    }
+
+    @Test
+    public void testFailDecodeNotBase64() throws IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
+        testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING);
+
+        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.clearTransferState();
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() throws IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
+        testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING);
+
+        testRunner.enqueue("four@@@@multiple".getBytes());
+        testRunner.clearTransferState();
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testBase32RoundTrip() throws IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
+        testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING);
+
+        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.clearTransferState();
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
+        testRunner.assertQueueEmpty();
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
+        testRunner.enqueue(flowFile);
+        testRunner.clearTransferState();
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
+
+        flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
+    }
+
+    @Test
+    public void testFailDecodeNotBase32() throws IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
+        testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING);
+
+        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.clearTransferState();
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testHexRoundTrip() throws IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
+        testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING);
+
+        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.clearTransferState();
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
+        testRunner.assertQueueEmpty();
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
+        testRunner.enqueue(flowFile);
+        testRunner.clearTransferState();
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
+
+        flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
+    }
+
+    @Test
+    public void testFailDecodeNotHex() throws IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
+
+        testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
+        testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING);
+
+        testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+        testRunner.clearTransferState();
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_FAILURE, 1);
+    }
+}


Mime
View raw message