nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject nifi-minifi-cpp git commit: MINIFI-275 Bugfix for YAML Configs without Component IDs
Date Wed, 03 May 2017 04:04:55 GMT
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 45ff49990 -> d1e3694c3


MINIFI-275 Bugfix for YAML Configs without Component IDs

This commit fixes a bug in which loading config YAML that did not
specify IDs (UUIDs) for components caused an exception. The logic
previously treated component IDs as required fields by loading them
without checking for existence. This commit updates the logic to
generate IDs when they are not specified in the YAML config. As part
of this fix, the logic for loading connections from YAML config was
modified to search by name or remote port id in the absence of source
id or remote id in the YAML config.

For fields that are required, useful error messages were added when
those fields are missing to assist users in self-diagnosing issues
related to invalid config files.

Some minor tweaks to the top level .gitignore are included with this
commit.

This closes #85.

Signed-off-by: Aldrin Piri <aldrin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/d1e3694c
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/d1e3694c
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/d1e3694c

Branch: refs/heads/master
Commit: d1e3694c32f566836d06577ab1da609b3f9dca14
Parents: 45ff499
Author: Kevin Doran <kdoran.apache@gmail.com>
Authored: Fri Apr 28 17:43:37 2017 -0400
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Wed May 3 00:04:03 2017 -0400

----------------------------------------------------------------------
 .gitignore                                      |   9 +-
 CMakeLists.txt                                  |   2 +
 libminifi/include/core/yaml/YamlConfiguration.h | 190 +++++++++--
 libminifi/src/core/yaml/YamlConfiguration.cpp   | 331 ++++++++++++-------
 libminifi/test/unit/YamlCongifurationTests.cpp  | 185 +++++++++++
 5 files changed, 570 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d1e3694c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 846c717..2da62d5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,9 @@
+# Standard ignores
+.DS_Store
+
+# Ignore JetBrains project files
+.idea
+
 # Filter out generated files from the included libuuid
 thirdparty/uuid/tst_uuid*
 assemblies
@@ -19,5 +25,4 @@ thirdparty/**/*.a
 docs/generated
 
 # Ignore source files that have been placed in the docker directory during build
-docker/minificppsource
-docker/.DS_STORE
+docker/minificppsource
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d1e3694c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f45dce9..30855be 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -152,6 +152,7 @@ enable_testing(test)
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/")
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/core")
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/core/repository")
+    target_include_directories(tests PRIVATE BEFORE "libminifi/include/core/yaml")
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/io")
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/utils")
     target_include_directories(tests PRIVATE BEFORE "libminifi/include/processors")
@@ -215,6 +216,7 @@ enable_testing(test)
     target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/")
     target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core")
     target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core/repository")
+    target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core/yaml")
     target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/io")
     target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/utils")
     target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/processors")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d1e3694c/libminifi/include/core/yaml/YamlConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 319f33a..fb54869 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -32,7 +32,11 @@ namespace minifi {
 namespace core {
 
 #define DEFAULT_FLOW_YAML_FILE_NAME "conf/flow.yml"
+#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
 #define CONFIG_YAML_PROCESSORS_KEY "Processors"
+#define CONFIG_YAML_CONNECTIONS_KEY "Connections"
+#define CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY "Remote Processing Groups"
+#define CONFIG_YAML_PROVENANCE_REPORT_KEY "Provenance Reporting"
 
 class YamlConfiguration : public FlowConfiguration {
 
@@ -50,48 +54,192 @@ class YamlConfiguration : public FlowConfiguration {
 
   }
 
-  std::unique_ptr<core::ProcessGroup> getRoot(const std::string &from_config) {
+  /**
+   * Returns a shared pointer to a ProcessGroup object containing the
+   * flow configuration. The yamlConfigStr argument must hold a string
+   * for the raw YAML configuration.
+   *
+   * @param yamlConfigStr a string holding the raw YAML to be parsed and
+   *                        loaded into a flow configuration tree
+   * @return              the root ProcessGroup node of the flow
+   *                        configuration tree
+   */
+  std::unique_ptr<core::ProcessGroup> getRoot(const std::string &yamlConfigStr)
{
+    YAML::Node rootYamlNode = YAML::LoadFile(yamlConfigStr);
+    return getRoot(&rootYamlNode);
+  }
+
+  /**
+   * Returns a shared pointer to a ProcessGroup object containing the
+   * flow configuration. The yamlConfigStream argument must point to
+   * an input stream for the raw YAML configuration.
+   *
+   * @param yamlConfigStream an input stream for the raw YAML configutation
+   *                           to be parsed and loaded into the flow
+   *                           configuration tree
+   * @return                 the root ProcessGroup node of the flow
+   *                           configuration tree
+   */
+  std::unique_ptr<core::ProcessGroup> getRoot(std::istream &yamlConfigStream) {
+    YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
+    return getRoot(&rootYamlNode);
+  }
 
-    YAML::Node flow = YAML::LoadFile(from_config);
+ protected:
 
-    YAML::Node flowControllerNode = flow["Flow Controller"];
-    YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
-    YAML::Node connectionsNode = flow["Connections"];
-    YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"];
-    YAML::Node provenanceReportNode = flow["Provenance Reporting"];
+  /**
+   * Returns a shared pointer to a ProcessGroup object containing the
+   * flow configuration. The rootYamlNode argument must point to
+   * an YAML::Node object containing the root node of the parsed YAML
+   * for the flow configuration.
+   *
+   * @param rootYamlNode a pointer to a YAML::Node object containing the root
+   *                       node of the parsed YAML document
+   * @return             the root ProcessGroup node of the flow
+   *                       configuration tree
+   */
+  std::unique_ptr<core::ProcessGroup> getRoot(YAML::Node *rootYamlNode) {
+    YAML::Node rootYaml = *rootYamlNode;
+    YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
+    YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
+    YAML::Node connectionsNode = rootYaml[CONFIG_YAML_CONNECTIONS_KEY];
+    YAML::Node remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+    YAML::Node provenanceReportNode = rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
 
     // Create the root process group
     core::ProcessGroup * root = parseRootProcessGroupYaml(flowControllerNode);
     parseProcessorNodeYaml(processorsNode, root);
-    parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, root);
+    parseRemoteProcessGroupYaml(&remoteProcessingGroupsNode, root);
     parseConnectionYaml(&connectionsNode, root);
     parseProvenanceReportingYaml(&provenanceReportNode, root);
 
     return std::unique_ptr<core::ProcessGroup>(root);
-
   }
- protected:
-  // Process Processor Node YAML
+
+  /**
+   * Parses a processor from its corresponding YAML config node and adds
+   * it to a parent ProcessGroup. The processorNode argument must point
+   * to a YAML::Node containing the processor configuration. A Processor
+   * object will be created a added to the parent ProcessGroup specified
+   * by the parent argument.
+   *
+   * @param processorNode the YAML::Node containing the processor configuration
+   * @param parent        the parent ProcessGroup to which the the created
+   *                        Processor should be added
+   */
   void parseProcessorNodeYaml(YAML::Node processorNode,
                               core::ProcessGroup * parent);
-  // Process Port YAML
+
+  /**
+   * Parses a port from its corressponding YAML config node and adds
+   * it to a parent ProcessGroup. The portNode argument must point
+   * to a YAML::Node containing the port configuration. A RemoteProcessorGroupPort
+   * object will be created a added to the parent ProcessGroup specified
+   * by the parent argument.
+   *
+   * @param portNode  the YAML::Node containing the port configuration
+   * @param parent    the parent ProcessGroup for the port
+   * @param direction the TransferDirection of the port
+   */
   void parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent,
                      TransferDirection direction);
-  // Process Root Processor Group YAML
+
+
+  /**
+   * Parses the root level YAML node for the flow configuration and
+   * returns a ProcessGroup containing the tree of flow configuration
+   * objects.
+   *
+   * @param rootNode
+   * @return
+   */
   core::ProcessGroup *parseRootProcessGroupYaml(YAML::Node rootNode);
-  // Process Property YAML
-  void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node,
-                                  std::shared_ptr<core::Processor> processor);
-  // Process connection YAML
+
+  /**
+   * Parses the Connections section of a configuration YAML.
+   * The resulting Connections are added to the parent ProcessGroup.
+   *
+   * @param node   the YAML::Node containing the Connections section
+   *                 of the configuration YAML
+   * @param parent the root node of flow configuration to which
+   *                 to add the connections that are parsed
+   */
   void parseConnectionYaml(YAML::Node *node, core::ProcessGroup * parent);
-  // Process Remote Process Group YAML
+
+  /**
+   * Parses the Remote Process Group section of a configuration YAML.
+   * The resulting Process Group is added to the parent ProcessGroup.
+   *
+   * @param node   the YAML::Node containing the Remote Process Group
+   *                 section of the configuration YAML
+   * @param parent the root node of flow configuration to which
+   *                 to add the process groups that are parsed
+   */
   void parseRemoteProcessGroupYaml(YAML::Node *node,
                                    core::ProcessGroup * parent);
-  // Process Provenance Report YAML
-  void parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup * parentGroup);
-  // Parse Properties Node YAML for a processor
+
+  /**
+   * Parses the Provenance Reporting section of a configuration YAML.
+   * The resulting Provenance Reporting processor is added to the
+   * parent ProcessGroup.
+   *
+   * @param reportNode  the YAML::Node containing the provenance
+   *                      reporting configuration
+   * @param parentGroup the root node of flow configuration to which
+   *                      to add the provenance reporting config
+   */
+  void parseProvenanceReportingYaml(YAML::Node *reportNode,
+                                    core::ProcessGroup * parentGroup);
+
+  /**
+   * A helper function to parse the Properties Node YAML for a processor.
+   *
+   * @param propertiesNode the YAML::Node containing the properties
+   * @param processor      the Processor to which to add the resulting properties
+   */
   void parsePropertiesNodeYaml(YAML::Node *propertiesNode,
                                std::shared_ptr<core::Processor> processor);
+
+  /**
+   * A helper function for parsing or generating optional id fields.
+   *
+   * In parsing YAML flow configurations for config schema v1, the
+   * 'id' field of most component types that contains a UUID is optional.
+   * This function will check for the existence of the specified
+   * idField in the specified yamlNode. If present, the field will be parsed
+   * as a UUID and the UUID string will be returned. If not present, a
+   * random UUID string will be generated and returned.
+   *
+   * @param yamlNode a pointer to the YAML::Node that will be checked for the
+   *                   presence of an idField
+   * @param idField  the string of the name of the idField to check for. This
+   *                   is optional and defaults to 'id'
+   * @return         the parsed or generated UUID string
+   */
+  std::string getOrGenerateId(YAML::Node *yamlNode, const std::string &idField = "id");
+
+  /**
+   * This is a helper function for verifying the existence of a required
+   * field in a YAML::Node object. If the field is not present, an error
+   * message will be logged and a std::invalid_argument exception will be
+   * thrown indicating the absence of the required field in the YAML node.
+   *
+   * @param yamlNode     the YAML node to check
+   * @param fieldName    the required field key
+   * @param yamlSection  [optional] the top level section of the YAML config
+   *                       for the yamlNode. This is used fpr generating a
+   *                       useful error message for troubleshooting.
+   * @param errorMessage [optional] the error message string to use if
+   *                       the required field is missing. If not provided,
+   *                       a default error message will be generated.
+   *
+   * @throws std::invalid_argument if the required field 'fieldName' is
+   *                               not present in 'yamlNode'
+   */
+  void checkRequiredField(YAML::Node *yamlNode,
+                          const std::string &fieldName,
+                          const std::string &yamlSection = "",
+                          const std::string &errorMessage = "");
 };
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d1e3694c/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 5484e36..0429f2c 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -31,23 +31,13 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(
     YAML::Node rootFlowNode) {
   uuid_t uuid;
 
+  checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   std::string flowName = rootFlowNode["name"].as<std::string>();
-  std::string id;
-
-  try {
-    rootFlowNode["id"].as<std::string>();
-
-    uuid_parse(id.c_str(), uuid);
-  } catch (...) {
-    logger_->log_warn("Generating random ID for root node");
-    uuid_generate(uuid);
-    char uuid_str[37];
-    uuid_unparse(uuid, uuid_str);
-    id = uuid_str;
-  }
+  std::string id = getOrGenerateId(&rootFlowNode);
+  uuid_parse(id.c_str(), uuid);
 
-  logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str());
-  logger_->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
+  logger_->log_debug(
+      "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName);
   std::unique_ptr<core::ProcessGroup> group =
       FlowConfiguration::createRootProcessGroup(flowName, uuid);
 
@@ -73,22 +63,22 @@ void YamlConfiguration::parseProcessorNodeYaml(
   if (processorsNode) {
     if (processorsNode.IsSequence()) {
       // Evaluate sequence of processors
-      int numProcessors = processorsNode.size();
-
       for (YAML::const_iterator iter = processorsNode.begin();
           iter != processorsNode.end(); ++iter) {
         core::ProcessorConfig procCfg;
         YAML::Node procNode = iter->as<YAML::Node>();
 
+        checkRequiredField(&procNode, "name", CONFIG_YAML_PROCESSORS_KEY);
+        checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
+
         procCfg.name = procNode["name"].as<std::string>();
-        procCfg.id = procNode["id"].as<std::string>();
+        procCfg.id = getOrGenerateId(&procNode);
+        uuid_parse(procCfg.id.c_str(), uuid);
         logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
-                           procCfg.name.c_str(), procCfg.id.c_str());
+                           procCfg.name, procCfg.id);
         procCfg.javaClass = procNode["class"].as<std::string>();
         logger_->log_debug("parseProcessorNode: class => [%s]",
-                           procCfg.javaClass.c_str());
-
-        uuid_parse(procCfg.id.c_str(), uuid);
+                           procCfg.javaClass);
 
         // Determine the processor name only from the Java class
         int lastOfIdx = procCfg.javaClass.find_last_of(".");
@@ -101,8 +91,8 @@ void YamlConfiguration::parseProcessorNodeYaml(
         }
 
         if (!processor) {
-          logger_->log_error("Could not create a processor %s with name %s",
-                             procCfg.name.c_str(), procCfg.id.c_str());
+          logger_->log_error("Could not create a processor %s with id %s",
+                             procCfg.name, procCfg.id);
           throw std::invalid_argument(
               "Could not create processor " + procCfg.name);
         }
@@ -111,25 +101,27 @@ void YamlConfiguration::parseProcessorNodeYaml(
         procCfg.maxConcurrentTasks = procNode["max concurrent tasks"]
             .as<std::string>();
         logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]",
-                           procCfg.maxConcurrentTasks.c_str());
-        procCfg.schedulingStrategy = procNode["scheduling strategy"]
-            .as<std::string>();
+                           procCfg.maxConcurrentTasks);
+
+        procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
         logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]",
-                           procCfg.schedulingStrategy.c_str());
-        procCfg.schedulingPeriod =
-            procNode["scheduling period"].as<std::string>();
+                           procCfg.schedulingStrategy);
+
+        procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
         logger_->log_debug("parseProcessorNode: scheduling period => [%s]",
-                           procCfg.schedulingPeriod.c_str());
-        procCfg.penalizationPeriod = procNode["penalization period"]
-            .as<std::string>();
+                           procCfg.schedulingPeriod);
+
+        procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
         logger_->log_debug("parseProcessorNode: penalization period => [%s]",
-                           procCfg.penalizationPeriod.c_str());
+                           procCfg.penalizationPeriod);
+
         procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
         logger_->log_debug("parseProcessorNode: yield period => [%s]",
-                           procCfg.yieldPeriod.c_str());
+                           procCfg.yieldPeriod);
+
         procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
         logger_->log_debug("parseProcessorNode: run duration nanos => [%s]",
-                           procCfg.runDurationNanos.c_str());
+                           procCfg.runDurationNanos);
 
         // handle auto-terminated relationships
         YAML::Node autoTerminatedSequence =
@@ -187,16 +179,13 @@ void YamlConfiguration::parseProcessorNodeYaml(
 
         if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
           processor->setSchedulingStrategy(core::TIMER_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s",
-                             procCfg.schedulingStrategy.c_str());
+          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
         } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
           processor->setSchedulingStrategy(core::EVENT_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s",
-                             procCfg.schedulingStrategy.c_str());
+          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
         } else {
           processor->setSchedulingStrategy(core::CRON_DRIVEN);
-          logger_->log_debug("setting scheduling strategy as %s",
-                             procCfg.schedulingStrategy.c_str());
+          logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
         }
 
         int64_t maxConcurrentTasks;
@@ -204,14 +193,14 @@ void YamlConfiguration::parseProcessorNodeYaml(
                                         maxConcurrentTasks)) {
           logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
                              maxConcurrentTasks);
-          processor->setMaxConcurrentTasks(maxConcurrentTasks);
+          processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
         }
 
         if (core::Property::StringToInt(procCfg.runDurationNanos,
                                         runDurationNanos)) {
           logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]",
                              runDurationNanos);
-          processor->setRunDurationNano(runDurationNanos);
+          processor->setRunDurationNano((uint64_t) runDurationNanos);
         }
 
         std::set<core::Relationship> autoTerminatedRelationships;
@@ -219,7 +208,7 @@ void YamlConfiguration::parseProcessorNodeYaml(
           core::Relationship relationship(relString, "");
           logger_->log_debug(
               "parseProcessorNode: autoTerminatedRelationship  => [%s]",
-              relString.c_str());
+              relString);
           autoTerminatedRelationships.insert(relationship);
         }
 
@@ -230,13 +219,15 @@ void YamlConfiguration::parseProcessorNodeYaml(
     }
   } else {
     throw new std::invalid_argument(
-        "Cannot instantiate a MiNiFi instance without a defined Processors configuration
node.");
+        "Cannot instantiate a MiNiFi instance without a defined "
+            "Processors configuration node.");
   }
 }
 
 void YamlConfiguration::parseRemoteProcessGroupYaml(
     YAML::Node *rpgNode, core::ProcessGroup * parentGroup) {
   uuid_t uuid;
+  std::string id;
 
   if (!parentGroup) {
     logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists");
@@ -247,29 +238,27 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(
     if (rpgNode->IsSequence()) {
       for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end();
           ++iter) {
-        YAML::Node rpgNode = iter->as<YAML::Node>();
+        YAML::Node currRpgNode = iter->as<YAML::Node>();
 
-        auto name = rpgNode["name"].as<std::string>();
-        auto id = rpgNode["id"].as<std::string>();
+        auto name = currRpgNode["name"].as<std::string>();
+        id = getOrGenerateId(&currRpgNode);
 
         logger_->log_debug(
-            "parseRemoteProcessGroupYaml: name => [%s], id => [%s]",
-            name.c_str(), id.c_str());
+            "parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
 
-        std::string url = rpgNode["url"].as<std::string>();
-        logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]",
-                           url.c_str());
+        std::string url = currRpgNode["url"].as<std::string>();
+        logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
 
-        std::string timeout = rpgNode["timeout"].as<std::string>();
+        std::string timeout = currRpgNode["timeout"].as<std::string>();
         logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]",
-                           timeout.c_str());
+                           timeout);
 
-        std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
+        std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
         logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]",
-                           yieldPeriod.c_str());
+                           yieldPeriod);
 
-        YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
-        YAML::Node outputPorts = rpgNode["Output Ports"].as<YAML::Node>();
+        YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
+        YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
         core::ProcessGroup *group = NULL;
 
         uuid_parse(id.c_str(), uuid);
@@ -352,11 +341,17 @@ void YamlConfiguration::parseProvenanceReportingYaml(
 
   YAML::Node node = reportNode->as<YAML::Node>();
 
+  checkRequiredField(&node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
+  checkRequiredField(&node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
+  checkRequiredField(&node, "host", CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto hostStr = node["host"].as<std::string>();
+  checkRequiredField(&node, "port", CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto portStr = node["port"].as<std::string>();
+  checkRequiredField(&node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto portUUIDStr = node["port uuid"].as<std::string>();
+  checkRequiredField(&node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto batchSizeStr = node["batch size"].as<std::string>();
 
   // add processor to parent
@@ -364,9 +359,8 @@ void YamlConfiguration::parseProvenanceReportingYaml(
   processor->setScheduledState(core::RUNNING);
 
   core::TimeUnit unit;
-  if (core::Property::StringToTime(schedulingPeriodStr,
-      schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod,
unit,
-          schedulingPeriod)) {
+  if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) &&
+      core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
     logger_->log_debug(
         "ProvenanceReportingTask schedulingPeriod %d ns",
         schedulingPeriod);
@@ -375,20 +369,21 @@ void YamlConfiguration::parseProvenanceReportingYaml(
 
   if (schedulingStrategyStr == "TIMER_DRIVEN") {
      processor->setSchedulingStrategy(core::TIMER_DRIVEN);
-     logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr.c_str());
+     logger_->log_debug(
+         "ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr);
   } else {
     throw std::invalid_argument(
         "Invalid scheduling strategy " +  schedulingStrategyStr);
   }
 
   reportTask->setHost(hostStr);
-  logger_->log_debug("ProvenanceReportingTask host %s", hostStr.c_str());
+  logger_->log_debug("ProvenanceReportingTask host %s", hostStr);
   int64_t lvalue;
   if (core::Property::StringToInt(portStr, lvalue)) {
     logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue);
     reportTask->setPort((uint16_t) lvalue);
   }
-  logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr.c_str());
+  logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
   uuid_parse(portUUIDStr.c_str(), port_uuid);
   reportTask->setPortUUID(port_uuid);
   if (core::Property::StringToInt(batchSizeStr, lvalue)) {
@@ -396,11 +391,9 @@ void YamlConfiguration::parseProvenanceReportingYaml(
   }
 }
 
-void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
-                                            core::ProcessGroup *parent) {
-  uuid_t uuid;
-  std::shared_ptr<minifi::Connection> connection = nullptr;
-
+void YamlConfiguration::parseConnectionYaml(
+    YAML::Node *connectionsNode,
+    core::ProcessGroup *parent) {
   if (!parent) {
     logger_->log_error("parseProcessNode: no parent group was provided");
     return;
@@ -411,69 +404,101 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
       for (YAML::const_iterator iter = connectionsNode->begin();
           iter != connectionsNode->end(); ++iter) {
         YAML::Node connectionNode = iter->as<YAML::Node>();
+        std::shared_ptr<minifi::Connection> connection = nullptr;
 
+        // Configure basic connection
+        uuid_t uuid;
         std::string name = connectionNode["name"].as<std::string>();
-        std::string id = connectionNode["id"].as<std::string>();
-        std::string destId = connectionNode["destination id"].as<std::string>();
-
+        std::string id = getOrGenerateId(&connectionNode);
         uuid_parse(id.c_str(), uuid);
-
-        logger_->log_debug("Created connection with UUID %s and name %s",
-                           id.c_str(), name.c_str());
         connection = this->createConnection(name, uuid);
+        logger_->log_debug(
+            "Created connection with UUID %s and name %s", id, name);
+
+
+        // Configure connection source
         auto rawRelationship = connectionNode["source relationship name"]
             .as<std::string>();
         core::Relationship relationship(rawRelationship, "");
-        logger_->log_debug("parseConnection: relationship => [%s]",
-                           rawRelationship.c_str());
-        if (connection)
+        logger_->log_debug(
+            "parseConnection: relationship => [%s]", rawRelationship);
+        if (connection) {
           connection->setRelationship(relationship);
-        std::string connectionSrcProcId = connectionNode["source id"]
-            .as<std::string>();
-        uuid_t srcUUID;
-        uuid_parse(connectionSrcProcId.c_str(), srcUUID);
+        }
 
-        auto srcProcessor = parent->findProcessor(srcUUID);
+        uuid_t srcUUID;
+        std::string connectionSrcProcName = connectionNode["source name"]
+            .as<std::string>();
+        if (connectionNode["source id"]) {
+          std::string connectionSrcProcId = connectionNode["source id"]
+              .as<std::string>();
 
-        if (!srcProcessor) {
-          logger_->log_error(
-              "Could not locate a source with id %s to create a connection",
-              connectionSrcProcId.c_str());
-          throw std::invalid_argument(
-              "Could not locate a source with id %s to create a connection "
-                  + connectionSrcProcId);
+          uuid_parse(connectionSrcProcId.c_str(), srcUUID);
+        } else {
+          // if we don't have a source id, try harder to resolve the source processor.
+          // config schema v2 will make this unnecessary
+          uuid_t tmpUUID;
+          if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) &&
+              NULL != parent->findProcessor(tmpUUID)) {
+            // the source name is a remote port id, so use that as the source id
+            uuid_copy(srcUUID, tmpUUID);
+          } else {
+            // lastly, look the processor up by name
+            auto srcProcessor = parent->findProcessor(connectionSrcProcName);
+            if (NULL != srcProcessor) {
+              srcProcessor->getUUID(srcUUID);
+            } else {
+              // we ran out of ways to discover the source processor
+              logger_->log_error(
+                  "Could not locate a source with name %s to create a connection",
+                  connectionSrcProcName);
+              throw std::invalid_argument(
+                  "Could not locate a source with name " +
+                      connectionSrcProcName + " to create a connection ");
+            }
+          }
         }
+        connection->setSourceUUID(srcUUID);
 
+        // Configure connection destination
         uuid_t destUUID;
-        uuid_parse(destId.c_str(), destUUID);
-        auto destProcessor = parent->findProcessor(destUUID);
-        // If we could not find name, try by UUID
-        if (!destProcessor) {
-          uuid_t destUuid;
-          uuid_parse(destId.c_str(), destUuid);
-          destProcessor = parent->findProcessor(destUuid);
-        }
-        if (destProcessor) {
-          std::string destUuid = destProcessor->getUUIDStr();
+        std::string connectionDestProcName = connectionNode["destination name"]
+            .as<std::string>();
+        if (connectionNode["destination id"]) {
+          std::string connectionDestProcId = connectionNode["destination id"]
+              .as<std::string>();
+          uuid_parse(connectionDestProcId.c_str(), destUUID);
+        } else {
+          // we use the same logic as above for resolving the source processor
+          // for looking up the destination processor in absence of a processor id
+          uuid_t tmpUUID;
+          if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
+              NULL != parent->findProcessor(tmpUUID)) {
+            // the destination name is a remote port id, so use that as the dest id
+            uuid_copy(destUUID, tmpUUID);
+          } else {
+            // look the processor up by name
+            auto destProcessor = parent->findProcessor(connectionDestProcName);
+            if (NULL != destProcessor) {
+              destProcessor->getUUID(destUUID);
+            } else {
+              // we ran out of ways to discover the destination processor
+              logger_->log_error(
+                  "Could not locate a destination with name %s to create a connection",
+                  connectionDestProcName);
+              throw std::invalid_argument(
+                  "Could not locate a destination with name " +
+                      connectionDestProcName + " to create a connection");
+            }
+          }
         }
-
-        uuid_t srcUuid;
-        uuid_t destUuid;
-        srcProcessor->getUUID(srcUuid);
-        connection->setSourceUUID(srcUuid);
-        destProcessor->getUUID(destUuid);
-        connection->setDestinationUUID(destUuid);
+        connection->setDestinationUUID(destUUID);
 
         if (connection) {
           parent->addConnection(connection);
         }
       }
     }
-
-    if (connection)
-      parent->addConnection(connection);
-
-    return;
   }
 }
 
@@ -482,7 +507,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
                                       TransferDirection direction) {
   uuid_t uuid;
   std::shared_ptr<core::Processor> processor = NULL;
-  minifi::RemoteProcessorGroupPort *port = NULL;
+  std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL;
 
   if (!parent) {
     logger_->log_error("parseProcessNode: no parent group existed");
@@ -491,16 +516,20 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode,
 
   YAML::Node inputPortsObj = portNode->as<YAML::Node>();
 
-  // generate the random UIID
-  uuid_generate(uuid);
-
-  auto portId = inputPortsObj["id"].as<std::string>();
+  // Check for required fields
+  checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   auto nameStr = inputPortsObj["name"].as<std::string>();
+  checkRequiredField(&inputPortsObj, "id", "The field 'id' is required for "
+      "the port named '" + nameStr + "' in the YAML Config. If this port "
+      "is an input port for a NiFi Remote Process Group, the port "
+      "id should match the corresponding id specified in the NiFi configuration. "
+      "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
+  auto portId = inputPortsObj["id"].as<std::string>();
   uuid_parse(portId.c_str(), uuid);
 
-  port = new minifi::RemoteProcessorGroupPort(nameStr.c_str(), uuid);
+  port = std::make_shared<minifi::RemoteProcessorGroupPort>(nameStr, uuid);
 
-  processor = (std::shared_ptr<core::Processor>) port;
+  processor = std::static_pointer_cast<core::Processor>(port);
   port->setDirection(direction);
   port->setTimeOut(parent->getTimeOut());
   port->setTransmitting(true);
@@ -538,11 +567,65 @@ void YamlConfiguration::parsePropertiesNodeYaml(
       std::string rawValueString = propertyValueNode.as<std::string>();
       if (!processor->setProperty(propertyName, rawValueString)) {
         logger_->log_warn(
-            "Received property %s with value %s but is not one of the properties for %s",
-            propertyName.c_str(), rawValueString.c_str(),
-            processor->getName().c_str());
+            "Received property %s with value %s but it is not one of the properties for %s",
+            propertyName,
+            rawValueString,
+            processor->getName());
+      }
+    }
+  }
+}
+
+std::string YamlConfiguration::getOrGenerateId(
+    YAML::Node *yamlNode,
+    const std::string &idField) {
+  std::string id;
+  YAML::Node node = yamlNode->as<YAML::Node>();
+
+  if (node[idField]) {
+    if (YAML::NodeType::Scalar == node[idField].Type()) {
+      id = node[idField].as<std::string>();
+    } else {
+      throw std::invalid_argument(
+          "getOrGenerateId: idField is expected to reference YAML::Node "
+              "of YAML::NodeType::Scalar.");
+    }
+  } else {
+    uuid_t uuid;
+    uuid_generate(uuid);
+    char uuid_str[37];
+    uuid_unparse(uuid, uuid_str);
+    id = uuid_str;
+    logger_->log_debug("Generating random ID: id => [%s]", id);
+  }
+  return id;
+}
+
+void YamlConfiguration::checkRequiredField(
+    YAML::Node *yamlNode,
+    const std::string &fieldName,
+    const std::string &yamlSection,
+    const std::string &errorMessage) {
+
+  std::string errMsg = errorMessage;
+  if (!yamlNode->as<YAML::Node>()[fieldName]) {
+    if (errMsg.empty()) {
+      // Build a helpful error message for the user so they can fix the
+      // invalid YAML config file, using the component name if present
+      errMsg =
+          yamlNode->as<YAML::Node>()["name"] ?
+          "Unable to parse configuration file for component named '" +
+              yamlNode->as<YAML::Node>()["name"].as<std::string>() +
+              "' as required field '" + fieldName + "' is missing" :
+          "Unable to parse configuration file as required field '" +
+              fieldName + "' is missing";
+      if (!yamlSection.empty()) {
+        errMsg += " [in '" + yamlSection +
+            "' section of configuration file]";
       }
     }
+    logger_->log_error(errorMessage.c_str());
+    throw std::invalid_argument(errorMessage);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d1e3694c/libminifi/test/unit/YamlCongifurationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/YamlCongifurationTests.cpp b/libminifi/test/unit/YamlCongifurationTests.cpp
new file mode 100644
index 0000000..f08304c
--- /dev/null
+++ b/libminifi/test/unit/YamlCongifurationTests.cpp
@@ -0,0 +1,185 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+#include <core/RepositoryFactory.h>
+#include "core/yaml/YamlConfiguration.h"
+#include "../TestBase.h"
+
+static const std::shared_ptr<core::Repository> TEST_PROV_REPO = core::createRepository("provenancerepository",
true);
+static const std::shared_ptr<core::Repository> TEST_FF_REPO = core::createRepository("flowfilerepository",
true);
+
+TEST_CASE("Test YAML Config 1", "[testyamlconfig1]") {
+
+  static const std::string TEST_YAML_WITHOUT_IDS = "MiNiFi Config Version: 1\n"
+      "Flow Controller:\n"
+      "    name: MiNiFi Flow\n"
+      "    comment:\n"
+      "\n"
+      "Core Properties:\n"
+      "    flow controller graceful shutdown period: 10 sec\n"
+      "    flow service write delay interval: 500 ms\n"
+      "    administrative yield duration: 30 sec\n"
+      "    bored yield duration: 10 millis\n"
+      "\n"
+      "FlowFile Repository:\n"
+      "    partitions: 256\n"
+      "    checkpoint interval: 2 mins\n"
+      "    always sync: false\n"
+      "    Swap:\n"
+      "        threshold: 20000\n"
+      "        in period: 5 sec\n"
+      "        in threads: 1\n"
+      "        out period: 5 sec\n"
+      "        out threads: 4\n"
+      "\n"
+      "Provenance Repository:\n"
+      "    provenance rollover time: 1 min\n"
+      "\n"
+      "Content Repository:\n"
+      "    content claim max appendable size: 10 MB\n"
+      "    content claim max flow files: 100\n"
+      "    always sync: false\n"
+      "\n"
+      "Component Status Repository:\n"
+      "    buffer size: 1440\n"
+      "    snapshot frequency: 1 min\n"
+      "\n"
+      "Security Properties:\n"
+      "    keystore: /tmp/ssl/localhost-ks.jks\n"
+      "    keystore type: JKS\n"
+      "    keystore password: localtest\n"
+      "    key password: localtest\n"
+      "    truststore: /tmp/ssl/localhost-ts.jks\n"
+      "    truststore type: JKS\n"
+      "    truststore password: localtest\n"
+      "    ssl protocol: TLS\n"
+      "    Sensitive Props:\n"
+      "        key:\n"
+      "        algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n"
+      "        provider: BC\n"
+      "\n"
+      "Processors:\n"
+      "    - name: TailFile\n"
+      "      class: org.apache.nifi.processors.standard.TailFile\n"
+      "      max concurrent tasks: 1\n"
+      "      scheduling strategy: TIMER_DRIVEN\n"
+      "      scheduling period: 1 sec\n"
+      "      penalization period: 30 sec\n"
+      "      yield period: 1 sec\n"
+      "      run duration nanos: 0\n"
+      "      auto-terminated relationships list:\n"
+      "      Properties:\n"
+      "          File to Tail: logs/minifi-app.log\n"
+      "          Rolling Filename Pattern: minifi-app*\n"
+      "          Initial Start Position: Beginning of File\n"
+      "\n"
+      "Connections:\n"
+      "    - name: TailToS2S\n"
+      "      source name: TailFile\n"
+      "      source relationship name: success\n"
+      "      destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+      "      max work queue size: 0\n"
+      "      max work queue data size: 1 MB\n"
+      "      flowfile expiration: 60 sec\n"
+      "      queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n"
+      "\n"
+      "Remote Processing Groups:\n"
+      "    - name: NiFi Flow\n"
+      "      comment:\n"
+      "      url: https://localhost:8090/nifi\n"
+      "      timeout: 30 secs\n"
+      "      yield period: 10 sec\n"
+      "      Input Ports:\n"
+      "          - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+      "            name: tailed log\n"
+      "            comments:\n"
+      "            max concurrent tasks: 1\n"
+      "            use compression: false\n"
+      "\n"
+      "Provenance Reporting:\n"
+      "    comment:\n"
+      "    scheduling strategy: TIMER_DRIVEN\n"
+      "    scheduling period: 30 sec\n"
+      "    host: localhost\n"
+      "    port name: provenance\n"
+      "    port: 8090\n"
+      "    port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
+      "    destination url: https://localhost:8090/\n"
+      "    originating url: http://${hostname(true)}:8081/nifi\n"
+      "    use compression: true\n"
+      "    timeout: 30 secs\n"
+      "    batch size: 1000";
+
+  core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO);
+  std::istringstream yamlstream(TEST_YAML_WITHOUT_IDS);
+  std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig->getRoot(yamlstream);
+
+  REQUIRE(rootFlowConfig);
+
+  REQUIRE(rootFlowConfig->findProcessor("TailFile"));
+  REQUIRE(NULL != rootFlowConfig->findProcessor("TailFile")->getUUID());
+  REQUIRE(!rootFlowConfig->findProcessor("TailFile")->getUUIDStr().empty());
+  REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
+  REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessor("TailFile")->getSchedulingStrategy());
+  REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
+  REQUIRE(1*1000*1000*1000 == rootFlowConfig->findProcessor("TailFile")->getSchedulingPeriodNano());
+  REQUIRE(30*1000 == rootFlowConfig->findProcessor("TailFile")->getPenalizationPeriodMsec());
+  REQUIRE(1*1000 == rootFlowConfig->findProcessor("TailFile")->getYieldPeriodMsec());
+  REQUIRE(0 == rootFlowConfig->findProcessor("TailFile")->getRunDurationNano());
+
+  std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
+  rootFlowConfig->getConnections(connectionMap);
+  REQUIRE(1 == connectionMap.size());
+  // This is a map of UUID->Connection, and we don't know UUID, so just going to loop
over it
+  for(
+      std::map<std::string,std::shared_ptr<minifi::Connection>>::iterator it
= connectionMap.begin();
+      it != connectionMap.end();
+      ++it) {
+    REQUIRE(it->second);
+    REQUIRE(!it->second->getUUIDStr().empty());
+    REQUIRE(it->second->getDestination());
+    REQUIRE(it->second->getSource());
+  }
+}
+
+TEST_CASE("Test YAML Config Missing Required Fields", "[testyamlconfig2]") {
+
+  static const std::string TEST_YAML_NO_RPG_PORT_ID = "Flow Controller:\n"
+      "  name: MiNiFi Flow\n"
+      "Processors: []\n"
+      "Connections: []\n"
+      "Remote Processing Groups:\n"
+      "    - name: NiFi Flow\n"
+      "      comment:\n"
+      "      url: https://localhost:8090/nifi\n"
+      "      timeout: 30 secs\n"
+      "      yield period: 10 sec\n"
+      "      Input Ports:\n"
+      "          - name: tailed log\n"
+      "            comments:\n"
+      "            max concurrent tasks: 1\n"
+      "            use compression: false\n"
+      "\n";
+
+  core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO);
+  std::istringstream yamlstream(TEST_YAML_NO_RPG_PORT_ID);
+
+  REQUIRE_THROWS_AS(yamlConfig->getRoot(yamlstream), std::invalid_argument);
+}


Mime
View raw message