nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brosan...@apache.org
Subject [2/2] nifi-minifi-cpp git commit: MINIFI-283
Date Tue, 02 May 2017 16:12:46 GMT
MINIFI-283

MINIFI-269: Apply 7b322b41b570316ab013b805622884a326554c02 from original example

Isolate Processor Tests and make GetFile be shared pointer

This closes #84.

Signed-off-by: Aldrin Piri <aldrin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/53f09462
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/53f09462
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/53f09462

Branch: refs/heads/master
Commit: 53f094622d6bea9c86bef5a0a2f7855426e1df95
Parents: 35e24a3
Author: Marc Parisi <phrocker@apache.org>
Authored: Tue May 2 10:18:02 2017 -0400
Committer: Bryan Rosander <brosander@apache.org>
Committed: Tue May 2 12:10:00 2017 -0400

----------------------------------------------------------------------
 CMakeLists.txt                         |  23 ++
 libminifi/include/io/BaseStream.h      |  26 +-
 libminifi/src/io/BaseStream.cpp        |  65 ++++-
 libminifi/test/ProcessorTests.cpp      | 408 ++++++++++++++++++++++++++++
 libminifi/test/unit/ProcessorTests.cpp | 395 ---------------------------
 libminifi/test/unit/Site2SiteTests.cpp |  14 +-
 libminifi/test/unit/SiteToSiteHelper.h |   2 +-
 7 files changed, 519 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index df0ed1e..a9ddc07 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -244,6 +244,29 @@ enable_testing(test)
     endif(CURL_FOUND)
     target_link_libraries(testSockets ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES}
${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB})
     add_test(NAME TestSockets COMMAND testSockets)
+    
+    file(GLOB LIBMINIFI_TEST_PROCESSORS "libminifi/test/ProcessorTests.cpp")
+    add_executable(testProcessors ${LIBMINIFI_TEST_PROCESSORS} ${SPD_SOURCES})
+    target_include_directories(testProcessors PRIVATE BEFORE "thirdparty/catch")
+    target_include_directories(testProcessors PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
+    target_include_directories(testProcessors PRIVATE BEFORE "thirdparty/jsoncpp/include")
+    target_include_directories(testProcessors PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS})
+    target_include_directories(testProcessors PRIVATE BEFORE "include")
+    target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/")
+    target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/core")
+    target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/core/repository")
+    target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/io")
+    target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/utils")
+    target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/processors")
+    target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/core/reporting")
+    target_include_directories(testProcessors PRIVATE BEFORE "libminifi/include/provenance")
+    if (CURL_FOUND)
+        target_include_directories(testProcessors PRIVATE BEFORE ${CURL_INCLUDE_DIRS})
+    	target_link_libraries(testProcessors ${CURL_LIBRARIES})
+    endif(CURL_FOUND)
+    target_link_libraries(testProcessors ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES}
${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB})
+    add_test(NAME TestProcessors COMMAND testProcessors)
+    
 
 # Create a custom build target called "docker" that will invoke DockerBuild.sh and create
the NiFi-MiNiFi-CPP Docker image
 add_custom_target(

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/include/io/BaseStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h
index e2c0474..6f843bc 100644
--- a/libminifi/include/io/BaseStream.h
+++ b/libminifi/include/io/BaseStream.h
@@ -18,7 +18,7 @@
 
 #ifndef LIBMINIFI_INCLUDE_IO_BASESTREAM_H_
 #define LIBMINIFI_INCLUDE_IO_BASESTREAM_H_
-
+#include <iostream>
 #include <cstdint>
 #include "EndianCheck.h"
 #include "DataStream.h"
@@ -33,9 +33,14 @@ namespace io {
 class BaseStream : public DataStream, public Serializable {
 
  public:
-  BaseStream() {
+  BaseStream()
+      : composable_stream_(this) {
+  }
 
+  BaseStream(DataStream *other)
+      : composable_stream_(other) {
   }
+
   virtual ~BaseStream() {
 
   }
@@ -49,6 +54,8 @@ class BaseStream : public DataStream, public Serializable {
   virtual int write(uint32_t base_value, bool is_little_endian =
                         EndiannessCheck::IS_LITTLE);
 
+  int writeData(uint8_t *value, int size);
+
   /**
    * write 2 bytes to stream
    * @param base_value non encoded value
@@ -101,6 +108,19 @@ class BaseStream : public DataStream, public Serializable {
   virtual int read(uint8_t &value);
 
   /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
    * reads two bytes from the stream
    * @param value reference in which will set the result
    * @param stream stream from which we will read
@@ -151,6 +171,8 @@ class BaseStream : public DataStream, public Serializable {
    * @return resulting read size
    **/
   virtual int readUTF(std::string &str, bool widen = false);
+ protected:
+  DataStream *composable_stream_;
 };
 
 } /* namespace io */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/src/io/BaseStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp
index 8070c38..307f41d 100644
--- a/libminifi/src/io/BaseStream.cpp
+++ b/libminifi/src/io/BaseStream.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 #include "io/BaseStream.h"
+#include <vector>
 #include <string>
 #include "io/Serializable.h"
 
@@ -32,10 +33,19 @@ namespace io {
  * @return resulting write size
  **/
 int BaseStream::write(uint32_t base_value, bool is_little_endian) {
-  return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+  return Serializable::write(base_value,
+                             reinterpret_cast<DataStream*>(composable_stream_),
                              is_little_endian);
 }
 
+int BaseStream::writeData(uint8_t *value, int size) {
+  if (composable_stream_ == this) {
+    return DataStream::writeData(value, size);
+  } else {
+    return composable_stream_->writeData(value, size);
+  }
+}
+
 /**
  * write 2 bytes to stream
  * @param base_value non encoded value
@@ -44,7 +54,8 @@ int BaseStream::write(uint32_t base_value, bool is_little_endian) {
  * @return resulting write size
  **/
 int BaseStream::write(uint16_t base_value, bool is_little_endian) {
-  return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+  return Serializable::write(base_value,
+                             reinterpret_cast<DataStream*>(composable_stream_),
                              is_little_endian);
 }
 
@@ -56,7 +67,8 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) {
  * @return resulting write size
  **/
 int BaseStream::write(uint8_t *value, int len) {
-  return Serializable::write(value, len, reinterpret_cast<DataStream*>(this));
+  return Serializable::write(value, len,
+                             reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -67,7 +79,8 @@ int BaseStream::write(uint8_t *value, int len) {
  * @return resulting write size
  **/
 int BaseStream::write(uint64_t base_value, bool is_little_endian) {
-  return Serializable::write(base_value, reinterpret_cast<DataStream*>(this),
+  return Serializable::write(base_value,
+                             reinterpret_cast<DataStream*>(composable_stream_),
                              is_little_endian);
 }
 
@@ -87,7 +100,8 @@ int BaseStream::write(bool value) {
  * @return resulting write size
  **/
 int BaseStream::writeUTF(std::string str, bool widen) {
-  return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(this), widen);
+  return Serializable::writeUTF(
+      str, reinterpret_cast<DataStream*>(composable_stream_), widen);
 }
 
 /**
@@ -97,7 +111,8 @@ int BaseStream::writeUTF(std::string str, bool widen) {
  * @return resulting read size
  **/
 int BaseStream::read(uint8_t &value) {
-  return Serializable::read(value, reinterpret_cast<DataStream*>(this));
+  return Serializable::read(value,
+                            reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -107,7 +122,8 @@ int BaseStream::read(uint8_t &value) {
  * @return resulting read size
  **/
 int BaseStream::read(uint16_t &base_value, bool is_little_endian) {
-  return Serializable::read(base_value, reinterpret_cast<DataStream*>(this));
+  return Serializable::read(base_value,
+                            reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -117,7 +133,8 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian)
{
  * @return resulting read size
  **/
 int BaseStream::read(char &value) {
-  return Serializable::read(value, reinterpret_cast<DataStream*>(this));
+  return Serializable::read(value,
+                            reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -128,7 +145,27 @@ int BaseStream::read(char &value) {
  * @return resulting read size
  **/
 int BaseStream::read(uint8_t *value, int len) {
-  return Serializable::read(value, len, reinterpret_cast<DataStream*>(this));
+  return Serializable::read(value, len,
+                            reinterpret_cast<DataStream*>(composable_stream_));
+}
+
+/**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) {
+  return Serializable::read(&buf[0], buflen,
+                            reinterpret_cast<DataStream*>(composable_stream_));
+}
+/**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ */
+int BaseStream::readData(uint8_t *buf, int buflen) {
+  return Serializable::read(buf, buflen,
+                            reinterpret_cast<DataStream*>(composable_stream_));
 }
 
 /**
@@ -138,7 +175,8 @@ int BaseStream::read(uint8_t *value, int len) {
  * @return resulting read size
  **/
 int BaseStream::read(uint32_t &value, bool is_little_endian) {
-  return Serializable::read(value, reinterpret_cast<DataStream*>(this),
+  return Serializable::read(value,
+                            reinterpret_cast<DataStream*>(composable_stream_),
                             is_little_endian);
 }
 
@@ -149,7 +187,8 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::read(uint64_t &value, bool is_little_endian) {
-  return Serializable::read(value, reinterpret_cast<DataStream*>(this),
+  return Serializable::read(value,
+                            reinterpret_cast<DataStream*>(composable_stream_),
                             is_little_endian);
 }
 
@@ -160,9 +199,9 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) {
  * @return resulting read size
  **/
 int BaseStream::readUTF(std::string &str, bool widen) {
-  return Serializable::readUTF(str, reinterpret_cast<DataStream*>(this), widen);
+  return Serializable::readUTF(
+      str, reinterpret_cast<DataStream*>(composable_stream_), widen);
 }
-
 } /* namespace io */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/test/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/ProcessorTests.cpp b/libminifi/test/ProcessorTests.cpp
new file mode 100644
index 0000000..ebc408b
--- /dev/null
+++ b/libminifi/test/ProcessorTests.cpp
@@ -0,0 +1,408 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do this in one
cpp file
+#include <uuid/uuid.h>
+#include <fstream>
+#include "unit/ProvenanceTestHelper.h"
+#include "TestBase.h"
+#include "core/logging/LogAppenders.h"
+#include "core/logging/BaseLogger.h"
+#include "processors/GetFile.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+
+
+
+TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
+  std::shared_ptr<core::Processor> processor = std::make_shared<
+        org::apache::nifi::minifi::processors::GetFile>("processorname");
+  REQUIRE(processor->getName() == "processorname");
+}
+
+TEST_CASE("Test Find file", "[getfileCreate2]") {
+
+  TestController testController;
+
+  testController.enableDebug();
+
+  std::shared_ptr<core::Processor> processor = std::make_shared<
+      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+
+  std::shared_ptr<core::Processor> processorReport =
+      std::make_shared<
+          org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>();
+
+  std::shared_ptr<core::Repository> test_repo =
+      std::make_shared<TestRepository>();
+
+  std::shared_ptr<TestRepository> repo =
+      std::static_pointer_cast<TestRepository>(test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
+      TestFlowController>(test_repo, test_repo);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  uuid_t processoruuid;
+  REQUIRE(true == processor->getUUID(processoruuid));
+
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<
+      minifi::Connection>(test_repo, "getfileCreate2Connection");
+  connection->setRelationship(core::Relationship("success", "description"));
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(processor);
+  connection->setDestination(processor);
+
+  connection->setSourceUUID(processoruuid);
+  connection->setDestinationUUID(processoruuid);
+
+  processor->addConnection(connection);
+  REQUIRE(dir != NULL);
+
+  core::ProcessorNode node(processor);
+
+  core::ProcessContext context(node, test_repo);
+  core::ProcessSessionFactory factory(&context);
+  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
+                      dir);
+  core::ProcessSession session(&context);
+
+  processor->onSchedule(&context, &factory);
+  REQUIRE(processor->getName() == "getfileCreate2");
+
+  std::shared_ptr<core::FlowFile> record;
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  processor->onTrigger(&context, &session);
+
+  provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
+  record = session.get();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  processor->incrementActiveTasks();
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  processor->onTrigger(&context, &session);
+  unlink(ss.str().c_str());
+  reporter = session.getProvenanceReporter();
+
+  REQUIRE(processor->getName() == "getfileCreate2");
+
+  records = reporter->getEvents();
+
+  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+    REQUIRE(provEventRecord->getComponentType() == processor->getName());
+  }
+  session.commit();
+  std::shared_ptr<core::FlowFile> ffr = session.get();
+
+  ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+  REQUIRE(2 == repo->getRepoMap().size());
+
+  for (auto entry : repo->getRepoMap()) {
+    provenance::ProvenanceEventRecord newRecord;
+    newRecord.DeSerialize((uint8_t*) entry.second.data(),
+                          entry.second.length());
+
+    bool found = false;
+    for (auto provRec : records) {
+      if (provRec->getEventId() == newRecord.getEventId()) {
+        REQUIRE(provRec->getEventId() == newRecord.getEventId());
+        REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
+        REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
+        REQUIRE(provRec->getDetails() == newRecord.getDetails());
+        REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
+        found = true;
+        break;
+      }
+    }
+    if (!found)
+      throw std::runtime_error("Did not find record");
+
+  }
+
+  core::ProcessorNode nodeReport(processorReport);
+  core::ProcessContext contextReport(nodeReport, test_repo);
+  core::ProcessSessionFactory factoryReport(&contextReport);
+  core::ProcessSession sessionReport(&contextReport);
+  processorReport->onSchedule(&contextReport, &factoryReport);
+  std::shared_ptr<
+      org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport
=
+      std::static_pointer_cast<
+          org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
+          processorReport);
+  taskReport->setBatchSize(1);
+  std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> recordsReport;
+  processorReport->incrementActiveTasks();
+  processorReport->setScheduledState(core::ScheduledState::RUNNING);
+  std::string jsonStr;
+  repo->getProvenanceRecord(recordsReport, 1);
+  taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport,
+                            jsonStr);
+  REQUIRE(recordsReport.size() == 1);
+  REQUIRE(
+      taskReport->getName()
+          == std::string(
+              org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
+  REQUIRE(
+      jsonStr.find("\"componentType\" : \"getfileCreate2\"")
+          != std::string::npos);
+}
+
+TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
+
+  TestController testController;
+
+  testController.enableDebug();
+
+  std::shared_ptr<core::Processor> processor = std::make_shared<
+      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+
+  std::shared_ptr<core::Repository> test_repo =
+      std::make_shared<TestRepository>();
+
+  std::shared_ptr<TestRepository> repo =
+      std::static_pointer_cast<TestRepository>(test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
+      TestFlowController>(test_repo, test_repo);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  uuid_t processoruuid;
+  REQUIRE(true == processor->getUUID(processoruuid));
+
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<
+      minifi::Connection>(test_repo, "getfileCreate2Connection");
+  connection->setRelationship(core::Relationship("success", "description"));
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(processor);
+  connection->setDestination(processor);
+
+  connection->setSourceUUID(processoruuid);
+  connection->setDestinationUUID(processoruuid);
+
+  processor->addConnection(connection);
+  REQUIRE(dir != NULL);
+
+  core::ProcessorNode node(processor);
+  core::ProcessContext context(node, test_repo);
+  core::ProcessSessionFactory factory(&context);
+  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
+                      dir);
+  // replicate 10 threads
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  processor->onSchedule(&context, &factory);
+
+  int prev = 0;
+  for (int i = 0; i < 10; i++) {
+
+    core::ProcessSession session(&context);
+    REQUIRE(processor->getName() == "getfileCreate2");
+
+    std::shared_ptr<core::FlowFile> record;
+
+    processor->onTrigger(&context, &session);
+
+    provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+    std::set<provenance::ProvenanceEventRecord*> records =
+        reporter->getEvents();
+    record = session.get();
+    REQUIRE(record == nullptr);
+    REQUIRE(records.size() == 0);
+
+    std::fstream file;
+    std::stringstream ss;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    processor->onTrigger(&context, &session);
+    unlink(ss.str().c_str());
+    reporter = session.getProvenanceReporter();
+
+    REQUIRE(processor->getName() == "getfileCreate2");
+
+    records = reporter->getEvents();
+
+    for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+      REQUIRE(provEventRecord->getComponentType() == processor->getName());
+    }
+    session.commit();
+    std::shared_ptr<core::FlowFile> ffr = session.get();
+
+    REQUIRE((repo->getRepoMap().size() % 2) == 0);
+    REQUIRE(repo->getRepoMap().size() == (prev + 2));
+    prev += 2;
+
+  }
+
+}
+
+TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
+  std::ostringstream oss;
+  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+      logging::BaseLogger>(
+      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+                                                                         0));
+  std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
+  logger->updateLogger(std::move(outputLogger));
+
+  TestController testController;
+
+  testController.enableDebug();
+
+  std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
+
+  std::shared_ptr<core::Processor> processor = std::make_shared<
+      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+
+  std::shared_ptr<core::Processor> logAttribute = std::make_shared<
+      org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  uuid_t processoruuid;
+  REQUIRE(true == processor->getUUID(processoruuid));
+
+  uuid_t logattribute_uuid;
+  REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
+
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<
+      minifi::Connection>(repo, "getfileCreate2Connection");
+  connection->setRelationship(core::Relationship("success", "description"));
+
+  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
+      minifi::Connection>(repo, "logattribute");
+  connection2->setRelationship(core::Relationship("success", "description"));
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(processor);
+
+  // link the connections so that we can test results at the end for this
+  connection->setDestination(logAttribute);
+
+  connection2->setSource(logAttribute);
+
+  connection2->setSourceUUID(logattribute_uuid);
+  connection->setSourceUUID(processoruuid);
+  connection->setDestinationUUID(logattribute_uuid);
+
+  processor->addConnection(connection);
+  logAttribute->addConnection(connection);
+  logAttribute->addConnection(connection2);
+  REQUIRE(dir != NULL);
+
+  core::ProcessorNode node(processor);
+  core::ProcessorNode node2(logAttribute);
+
+  core::ProcessContext context(node, repo);
+  core::ProcessContext context2(node2, repo);
+  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
+                      dir);
+  core::ProcessSession session(&context);
+  core::ProcessSession session2(&context2);
+
+  REQUIRE(processor->getName() == "getfileCreate2");
+
+  std::shared_ptr<core::FlowFile> record;
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+
+  core::ProcessSessionFactory factory(&context);
+  processor->onSchedule(&context, &factory);
+  processor->onTrigger(&context, &session);
+
+  logAttribute->incrementActiveTasks();
+  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
+  core::ProcessSessionFactory factory2(&context2);
+  logAttribute->onSchedule(&context2, &factory2);
+  logAttribute->onTrigger(&context2, &session2);
+
+  provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
+  record = session.get();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  processor->incrementActiveTasks();
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  processor->onTrigger(&context, &session);
+  unlink(ss.str().c_str());
+  reporter = session.getProvenanceReporter();
+
+  records = reporter->getEvents();
+  session.commit();
+  oss.str("");
+  oss.clear();
+
+  logAttribute->incrementActiveTasks();
+  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
+  logAttribute->onTrigger(&context2, &session2);
+
+  //session2.commit();
+  records = reporter->getEvents();
+
+  std::string log_attribute_output = oss.str();
+  REQUIRE(
+      log_attribute_output.find("key:absolute.path value:" + ss.str())
+          != std::string::npos);
+  REQUIRE(log_attribute_output.find("Size:8 Offset:0") != std::string::npos);
+  REQUIRE(
+      log_attribute_output.find("key:path value:" + std::string(dir))
+          != std::string::npos);
+
+  outputLogger = std::unique_ptr<logging::BaseLogger>(
+      new org::apache::nifi::minifi::core::logging::NullAppender());
+  logger->updateLogger(std::move(outputLogger));
+
+}
+
+int fileSize(const char *add) {
+  std::ifstream mySource;
+  mySource.open(add, std::ios_base::binary);
+  mySource.seekg(0, std::ios_base::end);
+  int size = mySource.tellg();
+  mySource.close();
+  return size;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
deleted file mode 100644
index 6432c48..0000000
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <uuid/uuid.h>
-#include <fstream>
-#include "FlowController.h"
-#include "ProvenanceTestHelper.h"
-#include "../TestBase.h"
-#include "core/logging/LogAppenders.h"
-#include "core/logging/BaseLogger.h"
-#include "processors/GetFile.h"
-#include "core/Core.h"
-#include "../../include/core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-
-TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
-  org::apache::nifi::minifi::processors::GetFile processor("processorname");
-  REQUIRE(processor.getName() == "processorname");
-}
-
-TEST_CASE("Test Find file", "[getfileCreate2]") {
-
-  TestController testController;
-
-  testController.enableDebug();
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
-  std::shared_ptr<core::Processor> processorReport = std::make_shared
-      < org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask
-      > ();
-
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
-
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
-
-  char format[] = "/tmp/gt.XXXXXX";
-  char *dir = testController.createTempDirectory(format);
-
-  uuid_t processoruuid;
-  REQUIRE(true == processor->getUUID(processoruuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
-  connection->setRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor);
-  connection->setDestination(processor);
-
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(processoruuid);
-
-  processor->addConnection(connection);
-  REQUIRE(dir != NULL);
-
-  core::ProcessorNode node(processor);
-
-  core::ProcessContext context(node, test_repo);
-  core::ProcessSessionFactory factory(&context);
-  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-      dir);
-  core::ProcessSession session(&context);
-
-  processor->onSchedule(&context, &factory);
-  REQUIRE(processor->getName() == "getfileCreate2");
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-
-  provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
-  record = session.get();
-  REQUIRE(record == nullptr);
-  REQUIRE(records.size() == 0);
-
-  std::fstream file;
-  std::stringstream ss;
-  ss << dir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << "tempFile";
-  file.close();
-
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-  unlink(ss.str().c_str());
-  reporter = session.getProvenanceReporter();
-
-  REQUIRE(processor->getName() == "getfileCreate2");
-
-  records = reporter->getEvents();
-
-  for (provenance::ProvenanceEventRecord *provEventRecord : records) {
-    REQUIRE(provEventRecord->getComponentType() == processor->getName());
-  }
-  session.commit();
-  std::shared_ptr<core::FlowFile> ffr = session.get();
-
-  ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-  REQUIRE(2 == repo->getRepoMap().size());
-
-  for (auto entry : repo->getRepoMap()) {
-    provenance::ProvenanceEventRecord newRecord;
-    newRecord.DeSerialize((uint8_t*) entry.second.data(),
-        entry.second.length());
-
-    bool found = false;
-    for (auto provRec : records) {
-      if (provRec->getEventId() == newRecord.getEventId()) {
-        REQUIRE(provRec->getEventId() == newRecord.getEventId());
-        REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
-        REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
-        REQUIRE(provRec->getDetails() == newRecord.getDetails());
-        REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
-        found = true;
-        break;
-      }
-    }
-    if (!found)
-      throw std::runtime_error("Did not find record");
-
-  }
-
-  core::ProcessorNode nodeReport(processorReport);
-  core::ProcessContext contextReport(nodeReport, test_repo);
-  core::ProcessSessionFactory factoryReport(&contextReport);
-  core::ProcessSession sessionReport(&contextReport);
-  processorReport->onSchedule(&contextReport, &factoryReport);
-  std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>
taskReport = std::static_pointer_cast
-        < org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask
> (processorReport);
-  taskReport->setBatchSize(1);
-  std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> recordsReport;
-  processorReport->incrementActiveTasks();
-  processorReport->setScheduledState(core::ScheduledState::RUNNING);
-  std::string jsonStr;
-  repo->getProvenanceRecord(recordsReport, 1);
-  taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport, jsonStr);
-  REQUIRE(recordsReport.size() == 1);
-  REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
-  REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos);
-}
-
-TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
-
-  TestController testController;
-
-  testController.enableDebug();
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
-
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
-
-  char format[] = "/tmp/gt.XXXXXX";
-  char *dir = testController.createTempDirectory(format);
-
-  uuid_t processoruuid;
-  REQUIRE(true == processor->getUUID(processoruuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
-  connection->setRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor);
-  connection->setDestination(processor);
-
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(processoruuid);
-
-  processor->addConnection(connection);
-  REQUIRE(dir != NULL);
-
-  core::ProcessorNode node(processor);
-  core::ProcessContext context(node, test_repo);
-  core::ProcessSessionFactory factory(&context);
-  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
-  // replicate 10 threads
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onSchedule(&context, &factory);
-
-  int prev = 0;
-  for (int i = 0; i < 10; i++) {
-
-    core::ProcessSession session(&context);
-    REQUIRE(processor->getName() == "getfileCreate2");
-
-    std::shared_ptr<core::FlowFile> record;
-
-    processor->onTrigger(&context, &session);
-
-    provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-    std::set<provenance::ProvenanceEventRecord*> records =
-        reporter->getEvents();
-    record = session.get();
-    REQUIRE(record == nullptr);
-    REQUIRE(records.size() == 0);
-
-    std::fstream file;
-    std::stringstream ss;
-    ss << dir << "/" << "tstFile.ext";
-    file.open(ss.str(), std::ios::out);
-    file << "tempFile";
-    file.close();
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    processor->onTrigger(&context, &session);
-    unlink(ss.str().c_str());
-    reporter = session.getProvenanceReporter();
-
-    REQUIRE(processor->getName() == "getfileCreate2");
-
-    records = reporter->getEvents();
-
-    for (provenance::ProvenanceEventRecord *provEventRecord : records) {
-      REQUIRE(provEventRecord->getComponentType() == processor->getName());
-    }
-    session.commit();
-    std::shared_ptr<core::FlowFile> ffr = session.get();
-
-    REQUIRE((repo->getRepoMap().size()%2) == 0);
-    REQUIRE(repo->getRepoMap().size() == (prev+2));
-    prev+=2;
-
-  }
-
-}
-
-TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
-  std::ostringstream oss;
-  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
-      logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
-                                                                         0));
-  std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
-  logger->updateLogger(std::move(outputLogger));
-
-  TestController testController;
-
-  testController.enableDebug();
-
-  std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
-  std::shared_ptr<core::Processor> logAttribute = std::make_shared<
-      org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
-  char format[] = "/tmp/gt.XXXXXX";
-  char *dir = testController.createTempDirectory(format);
-
-  uuid_t processoruuid;
-  REQUIRE(true == processor->getUUID(processoruuid));
-
-  uuid_t logattribute_uuid;
-  REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(repo, "getfileCreate2Connection");
-  connection->setRelationship(core::Relationship("success", "description"));
-
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
-      minifi::Connection>(repo, "logattribute");
-  connection2->setRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor);
-
-  // link the connections so that we can test results at the end for this
-  connection->setDestination(logAttribute);
-
-  connection2->setSource(logAttribute);
-
-  connection2->setSourceUUID(logattribute_uuid);
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(logattribute_uuid);
-
-  processor->addConnection(connection);
-  logAttribute->addConnection(connection);
-  logAttribute->addConnection(connection2);
-  REQUIRE(dir != NULL);
-
-  core::ProcessorNode node(processor);
-  core::ProcessorNode node2(logAttribute);
-
-  core::ProcessContext context(node, repo);
-  core::ProcessContext context2(node2, repo);
-  context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
-  core::ProcessSession session(&context);
-  core::ProcessSession session2(&context2);
-
-  REQUIRE(processor->getName() == "getfileCreate2");
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-
-  core::ProcessSessionFactory factory(&context);
-  processor->onSchedule(&context, &factory);
-  processor->onTrigger(&context, &session);
-
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  core::ProcessSessionFactory factory2(&context2);
-  logAttribute->onSchedule(&context2, &factory2);
-  logAttribute->onTrigger(&context2, &session2);
-
-  provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-  std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
-  record = session.get();
-  REQUIRE(record == nullptr);
-  REQUIRE(records.size() == 0);
-
-  std::fstream file;
-  std::stringstream ss;
-  ss << dir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << "tempFile";
-  file.close();
-
-  processor->incrementActiveTasks();
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-  processor->onTrigger(&context, &session);
-  unlink(ss.str().c_str());
-  reporter = session.getProvenanceReporter();
-
-  records = reporter->getEvents();
-  session.commit();
-  oss.str("");
-  oss.clear();
-
-  logAttribute->incrementActiveTasks();
-  logAttribute->setScheduledState(core::ScheduledState::RUNNING);
-  logAttribute->onTrigger(&context2, &session2);
-
-  //session2.commit();
-  records = reporter->getEvents();
-
-  std::string log_attribute_output = oss.str();
-  REQUIRE(
-      log_attribute_output.find("key:absolute.path value:" + ss.str())
-          != std::string::npos);
-  REQUIRE(log_attribute_output.find("Size:8 Offset:0") != std::string::npos);
-  REQUIRE(
-      log_attribute_output.find("key:path value:" + std::string(dir))
-          != std::string::npos);
-
-  outputLogger = std::unique_ptr<logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::NullAppender());
-  logger->updateLogger(std::move(outputLogger));
-
-}
-
-int fileSize(const char *add) {
-  std::ifstream mySource;
-  mySource.open(add, std::ios_base::binary);
-  mySource.seekg(0, std::ios_base::end);
-  int size = mySource.tellg();
-  mySource.close();
-  return size;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/test/unit/Site2SiteTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/Site2SiteTests.cpp b/libminifi/test/unit/Site2SiteTests.cpp
index a67881d..f831efe 100644
--- a/libminifi/test/unit/Site2SiteTests.cpp
+++ b/libminifi/test/unit/Site2SiteTests.cpp
@@ -100,7 +100,15 @@ void sunny_path_bootstrap(SiteToSiteResponder *collector) {
 }
 
 TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
-
+  std::ostringstream oss;
+
+   std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+       logging::BaseLogger>(
+       new org::apache::nifi::minifi::core::logging::OutputStreamAppender(
+           std::cout, minifi::Configure::getConfigure()));
+   std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
+   logger->updateLogger(std::move(outputLogger));
+   logger->setLogLevel("trace");
   SiteToSiteResponder *collector = new SiteToSiteResponder();
 
   sunny_path_bootstrap(collector);
@@ -108,7 +116,7 @@ TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
   std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr
       < minifi::Site2SitePeer
       > (new minifi::Site2SitePeer(
-          std::unique_ptr < minifi::io::DataStream > (collector), "fake_host",
+          std::unique_ptr < minifi::io::DataStream > (new BaseStream(collector)), "fake_host",
           65433));
 
   minifi::Site2SiteClientProtocol protocol(std::move(peer));
@@ -184,7 +192,7 @@ TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") {
   std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr
       < minifi::Site2SitePeer
       > (new minifi::Site2SitePeer(
-          std::unique_ptr < minifi::io::DataStream > (collector), "fake_host",
+          std::unique_ptr < minifi::io::DataStream > (new BaseStream(collector)), "fake_host",
           65433));
 
   minifi::Site2SiteClientProtocol protocol(std::move(peer));

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/53f09462/libminifi/test/unit/SiteToSiteHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h
index 36a7000..8c33396 100755
--- a/libminifi/test/unit/SiteToSiteHelper.h
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -137,7 +137,7 @@ public:
   virtual int read(uint64_t &value, bool is_little_endian =
       minifi::io::EndiannessCheck::IS_LITTLE) {
     value = std::stoull(get_next_response());
-    return 4;
+    return 8;
   }
 
   /**


Mime
View raw message