nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From phroc...@apache.org
Subject nifi-minifi-cpp git commit: MINIFICPP-52 basic ExtractText processor
Date Thu, 26 Oct 2017 13:54:46 GMT
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 1e9cf2ec7 -> 72b9a0e1a


MINIFICPP-52 basic ExtractText processor

Regex support blocked by dynamic properties #37

Requested changes part 1

Use a vector as the buffer, and add Size Limit parameter

This closes #152.

Signed-off-by: Marc Parisi <phrocker@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/72b9a0e1
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/72b9a0e1
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/72b9a0e1

Branch: refs/heads/master
Commit: 72b9a0e1a154495c27899cfdc35b443e1c2a34da
Parents: 1e9cf2e
Author: Caleb Johnson <me@calebj.io>
Authored: Tue Oct 17 19:50:13 2017 +0000
Committer: Marc Parisi <phrocker@apache.org>
Committed: Thu Oct 26 09:47:48 2017 -0400

----------------------------------------------------------------------
 README.md                                  |   1 +
 libminifi/include/processors/ExtractText.h |  91 ++++++++++++++++++
 libminifi/src/processors/ExtractText.cpp   | 121 ++++++++++++++++++++++++
 libminifi/test/unit/ExtractTextTests.cpp   | 101 ++++++++++++++++++++
 4 files changed, 314 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/72b9a0e1/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 5917f57..8f8af9c 100644
--- a/README.md
+++ b/README.md
@@ -57,6 +57,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the
agent a
   * PutFile
   * TailFile
   * MergeContent
+  * ExtractText
 * Provenance events generation is supported and are persisted using RocksDB.
 
 ## System Requirements

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/72b9a0e1/libminifi/include/processors/ExtractText.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ExtractText.h b/libminifi/include/processors/ExtractText.h
new file mode 100644
index 0000000..2d7ba56
--- /dev/null
+++ b/libminifi/include/processors/ExtractText.h
@@ -0,0 +1,91 @@
+/**
+ * @file ExtractText.h
+ * ExtractText class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EXTRACT_TEXT_H__
+#define __EXTRACT_TEXT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+
+#include <vector>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+//! ExtractText Class
+class ExtractText : public core::Processor {
+public:
+    //! Constructor
+    /*!
+     * Create a new processor
+     */
+    explicit ExtractText(std::string name, uuid_t uuid = nullptr)
+    : Processor(name, uuid)
+    {
+        logger_ = logging::LoggerFactory<ExtractText>::getLogger();
+    }
+    //! Processor Name
+    static constexpr char const* ProcessorName = "ExtractText";
+    //! Supported Properties
+    static core::Property Attribute;
+    static core::Property SizeLimit;
+    //! Supported Relationships
+    static core::Relationship Success;
+    //! Default maximum bytes to read into an attribute
+    static constexpr int DEFAULT_SIZE_LIMIT = 2 * 1024 * 1024;
+
+    //! OnTrigger method, implemented by NiFi ExtractText
+    void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+    //! Initialize, over write by NiFi ExtractText
+    void initialize(void);
+
+    class ReadCallback : public InputStreamCallback {
+    public:
+        ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext
*ct);
+        ~ReadCallback() {}
+        int64_t process(std::shared_ptr<io::BaseStream> stream);
+
+    private:
+        std::shared_ptr<core::FlowFile> flowFile_;
+        core::ProcessContext *ctx_;
+        std::vector<uint8_t> buffer_;
+        int64_t max_read_;
+    };
+
+protected:
+
+private:
+    //! Logger
+    std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(ExtractText);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/72b9a0e1/libminifi/src/processors/ExtractText.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExtractText.cpp b/libminifi/src/processors/ExtractText.cpp
new file mode 100644
index 0000000..1bb4d9f
--- /dev/null
+++ b/libminifi/src/processors/ExtractText.cpp
@@ -0,0 +1,121 @@
+/**
+ * @file ExtractText.cpp
+ * ExtractText class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <iterator>
+#include <string>
+#include <memory>
+#include <set>
+
+#include <iostream>
+#include <sstream>
+
+#include "processors/ExtractText.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/FlowFile.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ExtractText::Attribute("Attribute", "Attribute to set from content", "");
+core::Property ExtractText::SizeLimit("Size Limit", "Maximum number of bytes to read into
the attribute. 0 for no limit. Default is 2MB.");
+core::Relationship ExtractText::Success("success", "success operational on the flow record");
+
+void ExtractText::initialize() {
+    //! Set the supported properties
+    std::set<core::Property> properties;
+    properties.insert(Attribute);
+    setSupportedProperties(properties);
+    //! Set the supported relationships
+    std::set<core::Relationship> relationships;
+    relationships.insert(Success);
+    setSupportedRelationships(relationships);
+}
+
+void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession *session)
{
+    std::shared_ptr<core::FlowFile> flowFile = session->get();
+
+    if (!flowFile) {
+        return;
+    }
+
+    ReadCallback cb(flowFile, context);
+    session->read(flowFile, &cb);
+    session->transfer(flowFile, Success);
+}
+
+int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> stream)
{
+    int64_t ret = 0;
+    int64_t size_limit = flowFile_->getSize();
+    uint64_t read_size = 0;
+    uint64_t loop_read = max_read_;
+
+    std::string attrKey, sizeLimitStr;
+    ctx_->getProperty(Attribute.getName(), attrKey);
+    ctx_->getProperty(SizeLimit.getName(), sizeLimitStr);
+
+    if (sizeLimitStr == "")
+        size_limit = DEFAULT_SIZE_LIMIT;
+    else if (sizeLimitStr != "0")
+        size_limit = std::stoi(sizeLimitStr);
+
+    std::ostringstream contentStream;
+    std::string contentStr;
+
+    while (read_size < size_limit) {
+        if (size_limit - read_size < max_read_)
+            loop_read = size_limit - read_size;
+
+        ret = stream->readData(buffer_, loop_read);
+        buffer_.resize(ret);
+
+        if (ret < 0) {
+            return -1;
+        }
+
+        if (ret > 0) {
+            contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), ret);
+            if (contentStream.fail()) {
+                return -1;
+            }
+        } else {
+            break;
+        }
+    }
+
+    contentStr = contentStream.str();
+    flowFile_->setAttribute(attrKey, contentStr);
+    return read_size;
+}
+
+ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext
*ctx)
+    : max_read_(getpagesize()),
+      flowFile_(flowFile),
+      ctx_(ctx) {
+          buffer_.resize(max_read_);
+      }
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/72b9a0e1/libminifi/test/unit/ExtractTextTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ExtractTextTests.cpp b/libminifi/test/unit/ExtractTextTests.cpp
new file mode 100644
index 0000000..de5c591
--- /dev/null
+++ b/libminifi/test/unit/ExtractTextTests.cpp
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one
cpp file
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
+#include <iostream>
+
+#include "../TestBase.h"
+#include "core/Core.h"
+
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+
+#include "processors/ExtractText.h"
+#include "processors/LogAttribute.h"
+
+const char* TEST_TEXT = "Test text\n";
+const char* TEST_FILE = "test_file.txt";
+const char* TEST_ATTR = "ExtractedText";
+
+TEST_CASE("Test Creation of ExtractText", "[extracttextCreate]") {
+    TestController testController;
+    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ExtractText>("processorname");
+    REQUIRE(processor->getName() == "processorname");
+    uuid_t processoruuid;
+    REQUIRE(processor->getUUID(processoruuid));
+}
+
+TEST_CASE("Test usage of ExtractText", "[extracttextTest]") {
+    TestController testController;
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::ExtractText>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::GetFile>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::FlowFile>();
+
+    std::shared_ptr<TestPlan> plan = testController.createPlan();
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    char dir[] = "/tmp/gt.XXXXXX";
+
+    REQUIRE(testController.createTempDirectory(dir) != nullptr);
+    std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
+    plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(),
dir);
+    plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(),
"true");
+
+    std::shared_ptr<core::Processor> maprocessor = plan->addProcessor("ExtractText",
"testExtractText", core::Relationship("success", "description"), true);
+    plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::Attribute.getName(),
TEST_ATTR);
+
+    std::shared_ptr<core::Processor> laprocessor = plan->addProcessor("LogAttribute",
"outputLogAttribute", core::Relationship("success", "description"), true);
+    plan->setProperty(laprocessor, org::apache::nifi::minifi::processors::LogAttribute::AttributesToLog.getName(),
TEST_ATTR);
+
+    std::stringstream ss1;
+    ss1 << dir << "/" << TEST_FILE;
+    std::string test_file_path = ss1.str();
+
+    std::ofstream test_file(test_file_path);
+    if (test_file.is_open()) {
+        test_file << TEST_TEXT << std::endl;
+        test_file.close();
+    }
+
+    plan->runNextProcessor();  // GetFile
+    plan->runNextProcessor();  // ExtractText
+    plan->runNextProcessor();  // LogAttribute
+
+    std::stringstream ss2;
+    ss2 << "key:" << TEST_ATTR << " value:" << TEST_TEXT;
+    std::string log_check = ss2.str();
+
+    REQUIRE(LogTestController::getInstance().contains(log_check));
+
+    LogTestController::getInstance().reset();
+}


Mime
View raw message