MINIFI-283
MINIFI-269: Apply 7b322b41b570316ab013b805622884a326554c02 from original example
Isolate Processor Tests and make GetFile be shared pointer
This closes #84.
Signed-off-by: Aldrin Piri <aldrin@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/53f09462
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/53f09462
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/53f09462
Branch: refs/heads/master
Commit: 53f094622d6bea9c86bef5a0a2f7855426e1df95
Parents: 35e24a3
Author: Marc Parisi <phrocker@apache.org>
Authored: Tue May 2 10:18:02 2017 -0400
Committer: Bryan Rosander <brosander@apache.org>
Committed: Tue May 2 12:10:00 2017 -0400
----------------------------------------------------------------------
CMakeLists.txt | 23 ++
libminifi/include/io/BaseStream.h | 26 +-
libminifi/src/io/BaseStream.cpp | 65 ++++-
libminifi/test/ProcessorTests.cpp | 408 ++++++++++++++++++++++++++++
libminifi/test/unit/ProcessorTests.cpp | 395 ---------------------------
libminifi/test/unit/Site2SiteTests.cpp | 14 +-
libminifi/test/unit/SiteToSiteHelper.h | 2 +-
7 files changed, 519 insertions(+), 414 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index df0ed1e..a9ddc07 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -244,6 +244,29 @@ enable_testing(test)
endif(CURL_FOUND)
target_link_libraries(testSockets ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES}
${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB})
add_test(NAME TestSockets COMMAND testSockets)
+
+ file(GLOB LIBMINIFI_TEST_PROCESSORS "libminifi/test/ProcessorTests.cpp")
+ add_executable(testProcessors ${LIBMINIFI_TEST_PROCESSORS} ${SPD_SOURCES})
+ target_include_directories(testProcessors PRIVATE BEFORE "thirdparty/catch")
+ target_include_directories(testProcessors PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
+ target_include_directories(testProcessors PRIVATE BEFORE "thirdparty/jsoncpp/include")
+ target_include_directories(testProcessors PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS})
+ target_include_directories(testProcessors PRIVATE BEFORE "include")
+ target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/")
+ target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/core")
+ target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/core/repository")
+ target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/io")
+ target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/utils")
+ target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/processors")
+ target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/core/reporting")
+ target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/provenance")
+ if (CURL_FOUND)
+ target_include_directories(testProcessors PRIVATE BEFORE ${CURL_INCLUDE_DIRS})
+ target_link_libraries(testProcessors ${CURL_LIBRARIES})
+ endif(CURL_FOUND)
+ target_link_libraries(testProcessors ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES}
${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB})
+ add_test(NAME TestProcessors COMMAND testProcessors)
+
# Create a custom build target called "docker" that will invoke DockerBuild.sh and create
the NiFi-MiNiFi-CPP Docker image
add_custom_target(
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/include/io/BaseStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h
index e2c0474..6f843bc 100644
--- a/libminifi/include/io/BaseStream.h
+++ b/libminifi/include/io/BaseStream.h
@@ -18,7 +18,7 @@
#ifndef LIBMINIFI_INCLUDE_IO_BASESTREAM_H_
#define LIBMINIFI_INCLUDE_IO_BASESTREAM_H_
-
+#include <iostream>
#include <cstdint>
#include "EndianCheck.h"
#include "DataStream.h"
@@ -33,9 +33,14 @@ namespace io {
class BaseStream : public DataStream, public Serializable {
public:
- BaseStream() {
+ BaseStream()
+ : composable_stream_(this) {
+ }
+ BaseStream(DataStream *other)
+ : composable_stream_(other) {
}
+
virtual ~BaseStream() {
}
@@ -49,6 +54,8 @@ class BaseStream : public DataStream, public Serializable {
virtual int write(uint32_t base_value, bool is_little_endian =
EndiannessCheck::IS_LITTLE);
+ int writeData(uint8_t *value, int size);
+
/**
* write 2 bytes to stream
* @param base_value non encoded value
@@ -101,6 +108,19 @@ class BaseStream : public DataStream, public Serializable {
virtual int read(uint8_t &value);
/**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(std::vector<uint8_t> &buf, int buflen);
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+ virtual int readData(uint8_t *buf, int buflen);
+
+ /**
* reads two bytes from the stream
* @param value reference in which will set the result
* @param stream stream from which we will read
@@ -151,6 +171,8 @@ class BaseStream : public DataStream, public Serializable {
* @return resulting read size
**/
virtual int readUTF(std::string &str, bool widen = false);
+ protected:
+ DataStream *composable_stream_;
};
} /* namespace io */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/src/io/BaseStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp
index 8070c38..307f41d 100644
--- a/libminifi/src/io/BaseStream.cpp
+++ b/libminifi/src/io/BaseStream.cpp
@@ -16,6 +16,7 @@
* limitations under the License.
*/
#include "io/BaseStream.h"
+#include <vector>
#include <string>
#include "io/Serializable.h"
@@ -32,10 +33,19 @@ namespace io {
* @return resulting write size
**/
int BaseStream::write(uint32_t base_value, bool is_little_endian) {
- return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+ return Serializable::write(base_value,
+ reinterpret_cast<DataStream*>(composable_stream_),
is_little_endian);
}
+int BaseStream::writeData(uint8_t *value, int size) {
+ if (composable_stream_ == this) {
+ return DataStream::writeData(value, size);
+ } else {
+ return composable_stream_->writeData(value, size);
+ }
+}
+
/**
* write 2 bytes to stream
* @param base_value non encoded value
@@ -44,7 +54,8 @@ int BaseStream::write(uint32_t base_value, bool is_little_endian) {
* @return resulting write size
**/
int BaseStream::write(uint16_t base_value, bool is_little_endian) {
- return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+ return Serializable::write(base_value,
+ reinterpret_cast<DataStream*>(composable_stream_),
is_little_endian);
}
@@ -56,7 +67,8 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) {
* @return resulting write size
**/
int BaseStream::write(uint8_t *value, int len) {
- return Serializable::write(value, len, reinterpret_cast<DataStream*>(this));
+ return Serializable::write(value, len,
+ reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -67,7 +79,8 @@ int BaseStream::write(uint8_t *value, int len) {
* @return resulting write size
**/
int BaseStream::write(uint64_t base_value, bool is_little_endian) {
- return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+ return Serializable::write(base_value,
+ reinterpret_cast<DataStream*>(composable_stream_),
is_little_endian);
}
@@ -87,7 +100,8 @@ int BaseStream::write(bool value) {
* @return resulting write size
**/
int BaseStream::writeUTF(std::string str, bool widen) {
- return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(this), widen);
+ return Serializable::writeUTF(
+ str, reinterpret_cast<DataStream*>(composable_stream_), widen);
}
/**
@@ -97,7 +111,8 @@ int BaseStream::writeUTF(std::string str, bool widen) {
* @return resulting read size
**/
int BaseStream::read(uint8_t &value) {
- return Serializable::read(value, reinterpret_cast<DataStream*>(this));
+ return Serializable::read(value,
+ reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -107,7 +122,8 @@ int BaseStream::read(uint8_t &value) {
* @return resulting read size
**/
int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
- return Serializable::read(base_value, reinterpret_cast<DataStream*>(this));
+ return Serializable::read(base_value,
+ reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -117,7 +133,8 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian)
{
* @return resulting read size
**/
int BaseStream::read(char &value) {
- return Serializable::read(value, reinterpret_cast<DataStream*>(this));
+ return Serializable::read(value,
+ reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -128,7 +145,27 @@ int BaseStream::read(char &value) {
* @return resulting read size
**/
int BaseStream::read(uint8_t *value, int len) {
- return Serializable::read(value, len, reinterpret_cast<DataStream*>(this));
+ return Serializable::read(value, len,
+ reinterpret_cast<DataStream*>(composable_stream_));
+}
+
+/**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) {
+ return Serializable::read(&buf[0], buflen,
+ reinterpret_cast<DataStream*>(composable_stream_));
+}
+/**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+int BaseStream::readData(uint8_t *buf, int buflen) {
+ return Serializable::read(buf, buflen,
+ reinterpret_cast<DataStream*>(composable_stream_));
}
/**
@@ -138,7 +175,8 @@ int BaseStream::read(uint8_t *value, int len) {
* @return resulting read size
**/
int BaseStream::read(uint32_t &value, bool is_little_endian) {
- return Serializable::read(value, reinterpret_cast<DataStream*>(this),
+ return Serializable::read(value,
+ reinterpret_cast<DataStream*>(composable_stream_),
is_little_endian);
}
@@ -149,7 +187,8 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::read(uint64_t &value, bool is_little_endian) {
- return Serializable::read(value, reinterpret_cast<DataStream*>(this),
+ return Serializable::read(value,
+ reinterpret_cast<DataStream*>(composable_stream_),
is_little_endian);
}
@@ -160,9 +199,9 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) {
* @return resulting read size
**/
int BaseStream::readUTF(std::string &str, bool widen) {
- return Serializable::readUTF(str, reinterpret_cast<DataStream*>(this), widen);
+ return Serializable::readUTF(
+ str, reinterpret_cast<DataStream*>(composable_stream_), widen);
}
-
} /* namespace io */
} /* namespace minifi */
} /* namespace nifi */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/test/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/ProcessorTests.cpp b/libminifi/test/ProcessorTests.cpp
new file mode 100644
index 0000000..ebc408b
--- /dev/null
+++ b/libminifi/test/ProcessorTests.cpp
@@ -0,0 +1,408 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one
cpp file
+#include <uuid/uuid.h>
+#include <fstream>
+#include "unit/ProvenanceTestHelper.h"
+#include "TestBase.h"
+#include "core/logging/LogAppenders.h"
+#include "core/logging/BaseLogger.h"
+#include "processors/GetFile.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+
+
+
+TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
+ std::shared_ptr<core::Processor> processor = std::make_shared<
+ org::apache::nifi::minifi::processors::GetFile>("processorname");
+ REQUIRE(processor->getName() == "processorname");
+}
+
+TEST_CASE("Test Find file", "[getfileCreate2]") {
+
+ TestController testController;
+
+ testController.enableDebug();
+
+ std::shared_ptr<core::Processor> processor = std::make_shared<
+ org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+
+ std::shared_ptr<core::Processor> processorReport =
+ std::make_shared<
+ org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>();
+
+ std::shared_ptr<core::Repository> test_repo =
+ std::make_shared<TestRepository>();
+
+ std::shared_ptr<TestRepository> repo =
+ std::static_pointer_cast<TestRepository>(test_repo);
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<
+ TestFlowController>(test_repo, test_repo);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<
+ minifi::Connection>(test_repo, "getfileCreate2Connection");
+ connection->setRelationship(core::Relationship("success", "description"));
+
+ // link the connections so that we can test results at the end for this
+ connection->setSource(processor);
+ connection->setDestination(processor);
+
+ connection->setSourceUUID(processoruuid);
+ connection->setDestinationUUID(processoruuid);
+
+ processor->addConnection(connection);
+ REQUIRE(dir != NULL);
+
+ core::ProcessorNode node(processor);
+
+ core::ProcessContext context(node, test_repo);
+ core::ProcessSessionFactory factory(&context);
+ context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
+ dir);
+ core::ProcessSession session(&context);
+
+ processor->onSchedule(&context, &factory);
+ REQUIRE(processor->getName() == "getfileCreate2");
+
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onTrigger(&context, &session);
+
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ REQUIRE(records.size() == 0);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onTrigger(&context, &session);
+ unlink(ss.str().c_str());
+ reporter = session.getProvenanceReporter();
+
+ REQUIRE(processor->getName() == "getfileCreate2");
+
+ records = reporter->getEvents();
+
+ for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+ REQUIRE(provEventRecord->getComponentType() == processor->getName());
+ }
+ session.commit();
+ std::shared_ptr<core::FlowFile> ffr = session.get();
+
+ ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ REQUIRE(2 == repo->getRepoMap().size());
+
+ for (auto entry : repo->getRepoMap()) {
+ provenance::ProvenanceEventRecord newRecord;
+ newRecord.DeSerialize((uint8_t*) entry.second.data(),
+ entry.second.length());
+
+ bool found = false;
+ for (auto provRec : records) {
+ if (provRec->getEventId() == newRecord.getEventId()) {
+ REQUIRE(provRec->getEventId() == newRecord.getEventId());
+ REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
+ REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
+ REQUIRE(provRec->getDetails() == newRecord.getDetails());
+ REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ throw std::runtime_error("Did not find record");
+
+ }
+
+ core::ProcessorNode nodeReport(processorReport);
+ core::ProcessContext contextReport(nodeReport, test_repo);
+ core::ProcessSessionFactory factoryReport(&contextReport);
+ core::ProcessSession sessionReport(&contextReport);
+ processorReport->onSchedule(&contextReport, &factoryReport);
+ std::shared_ptr<
+ org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport
=
+ std::static_pointer_cast<
+ org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
+ processorReport);
+ taskReport->setBatchSize(1);
+ std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> recordsReport;
+ processorReport->incrementActiveTasks();
+ processorReport->setScheduledState(core::ScheduledState::RUNNING);
+ std::string jsonStr;
+ repo->getProvenanceRecord(recordsReport, 1);
+ taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport,
+ jsonStr);
+ REQUIRE(recordsReport.size() == 1);
+ REQUIRE(
+ taskReport->getName()
+ == std::string(
+ org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
+ REQUIRE(
+ jsonStr.find("\"componentType\" : \"getfileCreate2\"")
+ != std::string::npos);
+}
+
+TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
+
+ TestController testController;
+
+ testController.enableDebug();
+
+ std::shared_ptr<core::Processor> processor = std::make_shared<
+ org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+
+ std::shared_ptr<core::Repository> test_repo =
+ std::make_shared<TestRepository>();
+
+ std::shared_ptr<TestRepository> repo =
+ std::static_pointer_cast<TestRepository>(test_repo);
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<
+ TestFlowController>(test_repo, test_repo);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<
+ minifi::Connection>(test_repo, "getfileCreate2Connection");
+ connection->setRelationship(core::Relationship("success", "description"));
+
+ // link the connections so that we can test results at the end for this
+ connection->setSource(processor);
+ connection->setDestination(processor);
+
+ connection->setSourceUUID(processoruuid);
+ connection->setDestinationUUID(processoruuid);
+
+ processor->addConnection(connection);
+ REQUIRE(dir != NULL);
+
+ core::ProcessorNode node(processor);
+ core::ProcessContext context(node, test_repo);
+ core::ProcessSessionFactory factory(&context);
+ context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
+ dir);
+ // replicate 10 threads
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(&context, &factory);
+
+ int prev = 0;
+ for (int i = 0; i < 10; i++) {
+
+ core::ProcessSession session(&context);
+ REQUIRE(processor->getName() == "getfileCreate2");
+
+ std::shared_ptr<core::FlowFile> record;
+
+ processor->onTrigger(&context, &session);
+
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> records =
+ reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ REQUIRE(records.size() == 0);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onTrigger(&context, &session);
+ unlink(ss.str().c_str());
+ reporter = session.getProvenanceReporter();
+
+ REQUIRE(processor->getName() == "getfileCreate2");
+
+ records = reporter->getEvents();
+
+ for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+ REQUIRE(provEventRecord->getComponentType() == processor->getName());
+ }
+ session.commit();
+ std::shared_ptr<core::FlowFile> ffr = session.get();
+
+ REQUIRE((repo->getRepoMap().size() % 2) == 0);
+ REQUIRE(repo->getRepoMap().size() == (prev + 2));
+ prev += 2;
+
+ }
+
+}
+
+TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
+ std::ostringstream oss;
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
+ new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+ 0));
+ std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
+ logger->updateLogger(std::move(outputLogger));
+
+ TestController testController;
+
+ testController.enableDebug();
+
+ std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
+
+ std::shared_ptr<core::Processor> processor = std::make_shared<
+ org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+
+ std::shared_ptr<core::Processor> logAttribute = std::make_shared<
+ org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+
+ uuid_t logattribute_uuid;
+ REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
+
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<
+ minifi::Connection>(repo, "getfileCreate2Connection");
+ connection->setRelationship(core::Relationship("success", "description"));
+
+ std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
+ minifi::Connection>(repo, "logattribute");
+ connection2->setRelationship(core::Relationship("success", "description"));
+
+ // link the connections so that we can test results at the end for this
+ connection->setSource(processor);
+
+ // link the connections so that we can test results at the end for this
+ connection->setDestination(logAttribute);
+
+ connection2->setSource(logAttribute);
+
+ connection2->setSourceUUID(logattribute_uuid);
+ connection->setSourceUUID(processoruuid);
+ connection->setDestinationUUID(logattribute_uuid);
+
+ processor->addConnection(connection);
+ logAttribute->addConnection(connection);
+ logAttribute->addConnection(connection2);
+ REQUIRE(dir != NULL);
+
+ core::ProcessorNode node(processor);
+ core::ProcessorNode node2(logAttribute);
+
+ core::ProcessContext context(node, repo);
+ core::ProcessContext context2(node2, repo);
+ context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
+ dir);
+ core::ProcessSession session(&context);
+ core::ProcessSession session2(&context2);
+
+ REQUIRE(processor->getName() == "getfileCreate2");
+
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+
+ core::ProcessSessionFactory factory(&context);
+ processor->onSchedule(&context, &factory);
+ processor->onTrigger(&context, &session);
+
+ logAttribute->incrementActiveTasks();
+ logAttribute->setScheduledState(core::ScheduledState::RUNNING);
+ core::ProcessSessionFactory factory2(&context2);
+ logAttribute->onSchedule(&context2, &factory2);
+ logAttribute->onTrigger(&context2, &session2);
+
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ REQUIRE(records.size() == 0);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onTrigger(&context, &session);
+ unlink(ss.str().c_str());
+ reporter = session.getProvenanceReporter();
+
+ records = reporter->getEvents();
+ session.commit();
+ oss.str("");
+ oss.clear();
+
+ logAttribute->incrementActiveTasks();
+ logAttribute->setScheduledState(core::ScheduledState::RUNNING);
+ logAttribute->onTrigger(&context2, &session2);
+
+ //session2.commit();
+ records = reporter->getEvents();
+
+ std::string log_attribute_output = oss.str();
+ REQUIRE(
+ log_attribute_output.find("key:absolute.path value:" + ss.str())
+ != std::string::npos);
+ REQUIRE(log_attribute_output.find("Size:8 Offset:0") != std::string::npos);
+ REQUIRE(
+ log_attribute_output.find("key:path value:" + std::string(dir))
+ != std::string::npos);
+
+ outputLogger = std::unique_ptr<logging::BaseLogger>(
+ new org::apache::nifi::minifi::core::logging::NullAppender());
+ logger->updateLogger(std::move(outputLogger));
+
+}
+
+int fileSize(const char *add) {
+ std::ifstream mySource;
+ mySource.open(add, std::ios_base::binary);
+ mySource.seekg(0, std::ios_base::end);
+ int size = mySource.tellg();
+ mySource.close();
+ return size;
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
deleted file mode 100644
index 6432c48..0000000
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <uuid/uuid.h>
-#include <fstream>
-#include "FlowController.h"
-#include "ProvenanceTestHelper.h"
-#include "../TestBase.h"
-#include "core/logging/LogAppenders.h"
-#include "core/logging/BaseLogger.h"
-#include "processors/GetFile.h"
-#include "core/Core.h"
-#include "../../include/core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-
-TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
- org::apache::nifi::minifi::processors::GetFile processor("processorname");
- REQUIRE(processor.getName() == "processorname");
-}
-
-TEST_CASE("Test Find file", "[getfileCreate2]") {
-
- TestController testController;
-
- testController.enableDebug();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<
- org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
- std::shared_ptr<core::Processor> processorReport = std::make_shared
- < org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask
- > ();
-
- std::shared_ptr<core::Repository> test_repo =
- std::make_shared<TestRepository>();
-
- std::shared_ptr<TestRepository> repo =
- std::static_pointer_cast<TestRepository>(test_repo);
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<
- TestFlowController>(test_repo, test_repo);
-
- char format[] = "/tmp/gt.XXXXXX";
- char *dir = testController.createTempDirectory(format);
-
- uuid_t processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<
- minifi::Connection>(test_repo, "getfileCreate2Connection");
- connection->setRelationship(core::Relationship("success", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(processor);
- connection->setDestination(processor);
-
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(processoruuid);
-
- processor->addConnection(connection);
- REQUIRE(dir != NULL);
-
- core::ProcessorNode node(processor);
-
- core::ProcessContext context(node, test_repo);
- core::ProcessSessionFactory factory(&context);
- context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
- dir);
- core::ProcessSession session(&context);
-
- processor->onSchedule(&context, &factory);
- REQUIRE(processor->getName() == "getfileCreate2");
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(&context, &session);
-
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- REQUIRE(records.size() == 0);
-
- std::fstream file;
- std::stringstream ss;
- ss << dir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << "tempFile";
- file.close();
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(&context, &session);
- unlink(ss.str().c_str());
- reporter = session.getProvenanceReporter();
-
- REQUIRE(processor->getName() == "getfileCreate2");
-
- records = reporter->getEvents();
-
- for (provenance::ProvenanceEventRecord *provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == processor->getName());
- }
- session.commit();
- std::shared_ptr<core::FlowFile> ffr = session.get();
-
- ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- REQUIRE(2 == repo->getRepoMap().size());
-
- for (auto entry : repo->getRepoMap()) {
- provenance::ProvenanceEventRecord newRecord;
- newRecord.DeSerialize((uint8_t*) entry.second.data(),
- entry.second.length());
-
- bool found = false;
- for (auto provRec : records) {
- if (provRec->getEventId() == newRecord.getEventId()) {
- REQUIRE(provRec->getEventId() == newRecord.getEventId());
- REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
- REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
- REQUIRE(provRec->getDetails() == newRecord.getDetails());
- REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
- found = true;
- break;
- }
- }
- if (!found)
- throw std::runtime_error("Did not find record");
-
- }
-
- core::ProcessorNode nodeReport(processorReport);
- core::ProcessContext contextReport(nodeReport, test_repo);
- core::ProcessSessionFactory factoryReport(&contextReport);
- core::ProcessSession sessionReport(&contextReport);
- processorReport->onSchedule(&contextReport, &factoryReport);
- std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>
taskReport = std::static_pointer_cast
- < org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask
> (processorReport);
- taskReport->setBatchSize(1);
- std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> recordsReport;
- processorReport->incrementActiveTasks();
- processorReport->setScheduledState(core::ScheduledState::RUNNING);
- std::string jsonStr;
- repo->getProvenanceRecord(recordsReport, 1);
- taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport, jsonStr);
- REQUIRE(recordsReport.size() == 1);
- REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
- REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos);
-}
-
-TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
-
- TestController testController;
-
- testController.enableDebug();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<
- org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
- std::shared_ptr<core::Repository> test_repo =
- std::make_shared<TestRepository>();
-
- std::shared_ptr<TestRepository> repo =
- std::static_pointer_cast<TestRepository>(test_repo);
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<
- TestFlowController>(test_repo, test_repo);
-
- char format[] = "/tmp/gt.XXXXXX";
- char *dir = testController.createTempDirectory(format);
-
- uuid_t processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<
- minifi::Connection>(test_repo, "getfileCreate2Connection");
- connection->setRelationship(core::Relationship("success", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(processor);
- connection->setDestination(processor);
-
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(processoruuid);
-
- processor->addConnection(connection);
- REQUIRE(dir != NULL);
-
- core::ProcessorNode node(processor);
- core::ProcessContext context(node, test_repo);
- core::ProcessSessionFactory factory(&context);
- context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
- dir);
- // replicate 10 threads
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
-
- int prev = 0;
- for (int i = 0; i < 10; i++) {
-
- core::ProcessSession session(&context);
- REQUIRE(processor->getName() == "getfileCreate2");
-
- std::shared_ptr<core::FlowFile> record;
-
- processor->onTrigger(&context, &session);
-
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> records =
- reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- REQUIRE(records.size() == 0);
-
- std::fstream file;
- std::stringstream ss;
- ss << dir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << "tempFile";
- file.close();
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(&context, &session);
- unlink(ss.str().c_str());
- reporter = session.getProvenanceReporter();
-
- REQUIRE(processor->getName() == "getfileCreate2");
-
- records = reporter->getEvents();
-
- for (provenance::ProvenanceEventRecord *provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == processor->getName());
- }
- session.commit();
- std::shared_ptr<core::FlowFile> ffr = session.get();
-
- REQUIRE((repo->getRepoMap().size()%2) == 0);
- REQUIRE(repo->getRepoMap().size() == (prev+2));
- prev+=2;
-
- }
-
-}
-
-TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
- std::ostringstream oss;
- std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
- logging::BaseLogger>(
- new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
- 0));
- std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
- logger->updateLogger(std::move(outputLogger));
-
- TestController testController;
-
- testController.enableDebug();
-
- std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<
- org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
- std::shared_ptr<core::Processor> logAttribute = std::make_shared<
- org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- char format[] = "/tmp/gt.XXXXXX";
- char *dir = testController.createTempDirectory(format);
-
- uuid_t processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
-
- uuid_t logattribute_uuid;
- REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<
- minifi::Connection>(repo, "getfileCreate2Connection");
- connection->setRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
- minifi::Connection>(repo, "logattribute");
- connection2->setRelationship(core::Relationship("success", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(processor);
-
- // link the connections so that we can test results at the end for this
- connection->setDestination(logAttribute);
-
- connection2->setSource(logAttribute);
-
- connection2->setSourceUUID(logattribute_uuid);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logattribute_uuid);
-
- processor->addConnection(connection);
- logAttribute->addConnection(connection);
- logAttribute->addConnection(connection2);
- REQUIRE(dir != NULL);
-
- core::ProcessorNode node(processor);
- core::ProcessorNode node2(logAttribute);
-
- core::ProcessContext context(node, repo);
- core::ProcessContext context2(node2, repo);
- context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
- dir);
- core::ProcessSession session(&context);
- core::ProcessSession session2(&context2);
-
- REQUIRE(processor->getName() == "getfileCreate2");
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
-
- core::ProcessSessionFactory factory(&context);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
-
- logAttribute->incrementActiveTasks();
- logAttribute->setScheduledState(core::ScheduledState::RUNNING);
- core::ProcessSessionFactory factory2(&context2);
- logAttribute->onSchedule(&context2, &factory2);
- logAttribute->onTrigger(&context2, &session2);
-
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- REQUIRE(records.size() == 0);
-
- std::fstream file;
- std::stringstream ss;
- ss << dir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << "tempFile";
- file.close();
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(&context, &session);
- unlink(ss.str().c_str());
- reporter = session.getProvenanceReporter();
-
- records = reporter->getEvents();
- session.commit();
- oss.str("");
- oss.clear();
-
- logAttribute->incrementActiveTasks();
- logAttribute->setScheduledState(core::ScheduledState::RUNNING);
- logAttribute->onTrigger(&context2, &session2);
-
- //session2.commit();
- records = reporter->getEvents();
-
- std::string log_attribute_output = oss.str();
- REQUIRE(
- log_attribute_output.find("key:absolute.path value:" + ss.str())
- != std::string::npos);
- REQUIRE(log_attribute_output.find("Size:8 Offset:0") != std::string::npos);
- REQUIRE(
- log_attribute_output.find("key:path value:" + std::string(dir))
- != std::string::npos);
-
- outputLogger = std::unique_ptr<logging::BaseLogger>(
- new org::apache::nifi::minifi::core::logging::NullAppender());
- logger->updateLogger(std::move(outputLogger));
-
-}
-
-int fileSize(const char *add) {
- std::ifstream mySource;
- mySource.open(add, std::ios_base::binary);
- mySource.seekg(0, std::ios_base::end);
- int size = mySource.tellg();
- mySource.close();
- return size;
-}
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/test/unit/Site2SiteTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/Site2SiteTests.cpp b/libminifi/test/unit/Site2SiteTests.cpp
index a67881d..f831efe 100644
--- a/libminifi/test/unit/Site2SiteTests.cpp
+++ b/libminifi/test/unit/Site2SiteTests.cpp
@@ -100,7 +100,15 @@ void sunny_path_bootstrap(SiteToSiteResponder *collector) {
}
TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
-
+ std::ostringstream oss;
+
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
+ new org::apache::nifi::minifi::core::logging::OutputStreamAppender(
+ std::cout, minifi::Configure::getConfigure()));
+ std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
+ logger->updateLogger(std::move(outputLogger));
+ logger->setLogLevel("trace");
SiteToSiteResponder *collector = new SiteToSiteResponder();
sunny_path_bootstrap(collector);
@@ -108,7 +116,7 @@ TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr
< minifi::Site2SitePeer
> (new minifi::Site2SitePeer(
- std::unique_ptr < minifi::io::DataStream > (collector), "fake_host",
+ std::unique_ptr < minifi::io::DataStream > (new BaseStream(collector)), "fake_host",
65433));
minifi::Site2SiteClientProtocol protocol(std::move(peer));
@@ -184,7 +192,7 @@ TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") {
std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr
< minifi::Site2SitePeer
> (new minifi::Site2SitePeer(
- std::unique_ptr < minifi::io::DataStream > (collector), "fake_host",
+ std::unique_ptr < minifi::io::DataStream > (new BaseStream(collector)), "fake_host",
65433));
minifi::Site2SiteClientProtocol protocol(std::move(peer));
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/test/unit/SiteToSiteHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h
index 36a7000..8c33396 100755
--- a/libminifi/test/unit/SiteToSiteHelper.h
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -137,7 +137,7 @@ public:
virtual int read(uint64_t &value, bool is_little_endian =
minifi::io::EndiannessCheck::IS_LITTLE) {
value = std::stoull(get_next_response());
- return 4;
+ return 8;
}
/**
|