nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jperciv...@apache.org
Subject [3/3] nifi-minifi git commit: MINIFI-107 - Process group support
Date Tue, 08 Nov 2016 20:07:42 GMT
MINIFI-107 - Process group support

This closes #50

Signed-off-by: Joseph Percivall <joepercivall@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/31855bbc
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/31855bbc
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/31855bbc

Branch: refs/heads/master
Commit: 31855bbc7ee7016b460226d24790760523e07885
Parents: 1bbeedf
Author: Bryan Rosander <brosander@apache.org>
Authored: Tue Nov 1 12:54:17 2016 -0400
Committer: Joseph Percivall <joepercivall@yahoo.com>
Committed: Tue Nov 8 15:06:47 2016 -0500

----------------------------------------------------------------------
 .../bootstrap/util/ConfigTransformer.java       | 183 ++++--
 .../bootstrap/util/ParentGroupIdResolver.java   |  87 +++
 .../bootstrap/util/ConfigTransformerTest.java   | 166 ++++-
 .../util/ParentGroupIdResolverTest.java         | 129 ++++
 .../bootstrap/util/TestConfigTransformer.java   |   3 -
 .../test/resources/config-process-groups.yml    | 276 ++++++++
 .../minifi/commons/schema/ConfigSchema.java     | 124 ++--
 .../minifi/commons/schema/ConnectionSchema.java |  41 +-
 .../nifi/minifi/commons/schema/PortSchema.java  |  31 +
 .../commons/schema/ProcessGroupSchema.java      | 153 +++++
 .../minifi/commons/schema/ProcessorSchema.java  |  26 +-
 .../commons/schema/RemoteInputPortSchema.java   |  13 +-
 .../schema/RemoteProcessingGroupSchema.java     |  15 +-
 .../commons/schema/common/BaseSchema.java       |  52 +-
 .../schema/common/BaseSchemaWithIdAndName.java  |  20 +-
 .../commons/schema/common/CollectionUtil.java   |  39 ++
 .../schema/common/CommonPropertyKeys.java       |   1 +
 .../commons/schema/common/StringUtil.java       |  34 +-
 .../exception/SchemaInstantiatonException.java  |  30 +
 .../commons/schema/ProcessGroupSchemaTest.java  |  64 ++
 .../schema/serialization/SchemaLoaderTest.java  |   4 +-
 .../commons/schema/v1/ConfigSchemaV1Test.java   |   6 +-
 .../schema/v1/ConnectionSchemaV1Test.java       |   2 +-
 .../src/main/markdown/System_Admin_Guide.md     |  35 +-
 .../src/main/resources/conf/config.yml          |   3 +
 .../toolkit/configuration/ConfigMain.java       | 135 ++--
 .../configuration/dto/ConfigSchemaFunction.java |  67 +-
 .../dto/ConnectionSchemaFunction.java           |   6 +-
 .../configuration/dto/PortSchemaFunction.java   |  46 ++
 .../dto/ProcessorSchemaFunction.java            |   6 +-
 .../toolkit/configuration/ConfigMainTest.java   |  30 +-
 .../dto/PortSchemaFunctionTest.java             |  74 +++
 .../src/test/resources/CsvToJson.yml            |   3 +
 .../resources/DecompressionCircularFlow.yml     |   3 +
 .../resources/InvokeHttpMiNiFiTemplateTest.yml  |   3 +
 .../test/resources/MultipleRelationships.yml    |   3 +
 .../ProcessGroupsAndRemoteProcessGroups.xml     | 648 +++++++++++++++++++
 .../ProcessGroupsAndRemoteProcessGroups.yml     | 276 ++++++++
 ...aceTextExpressionLanguageCSVReformatting.yml |   3 +
 .../src/test/resources/StressTestFramework.yml  |   3 +
 40 files changed, 2532 insertions(+), 311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 9794415..9fa7f05 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -27,11 +27,15 @@ import org.apache.nifi.minifi.commons.schema.ContentRepositorySchema;
 import org.apache.nifi.minifi.commons.schema.CorePropertiesSchema;
 import org.apache.nifi.minifi.commons.schema.FlowControllerSchema;
 import org.apache.nifi.minifi.commons.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.commons.schema.PortSchema;
+import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
 import org.apache.nifi.minifi.commons.schema.ProvenanceReportingSchema;
 import org.apache.nifi.minifi.commons.schema.ProvenanceRepositorySchema;
 import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema;
 import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema;
+import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.commons.schema.common.Schema;
 import org.apache.nifi.minifi.commons.schema.common.StringUtil;
 import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
 import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
@@ -68,13 +72,13 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 
 public final class ConfigTransformer {
     // Underlying version of NIFI will be using
     public static final String NIFI_VERSION = "0.6.1";
+    public static final String ROOT_GROUP = "Root-Group";
 
     // Final util classes should have private constructor
     private ConfigTransformer() {
@@ -88,22 +92,25 @@ public final class ConfigTransformer {
     }
 
     public static void transformConfigFile(InputStream sourceStream, String destPath) throws Exception {
-        ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(sourceStream);
-        if (!configSchema.isValid()) {
-            throw new InvalidConfigurationException("Failed to transform config file due to:["
-                    + configSchema.getValidationIssues().stream().sorted().collect(Collectors.joining("], [")) + "]");
-        }
+        ConvertableSchema<ConfigSchema> convertableSchema = throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(sourceStream));
+        ConfigSchema configSchema = throwIfInvalid(convertableSchema.convert());
 
         // Create nifi.properties and flow.xml.gz in memory
         ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream();
         writeNiFiProperties(configSchema, nifiPropertiesOutputStream);
 
-        DOMSource flowXml = createFlowXml(configSchema);
+        writeFlowXmlFile(configSchema, destPath);
 
         // Write nifi.properties and flow.xml.gz
         writeNiFiPropertiesFile(nifiPropertiesOutputStream, destPath);
+    }
 
-        writeFlowXmlFile(flowXml, destPath);
+    private static <T extends Schema> T throwIfInvalid(T schema) throws InvalidConfigurationException {
+        if (!schema.isValid()) {
+            throw new InvalidConfigurationException("Failed to transform config file due to:["
+                    + schema.getValidationIssues().stream().sorted().collect(Collectors.joining("], [")) + "]");
+        }
+        return schema;
     }
 
     protected static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException {
@@ -118,10 +125,8 @@ public final class ConfigTransformer {
         }
     }
 
-    protected static void writeFlowXmlFile(DOMSource domSource, String path) throws IOException, TransformerException {
-        final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
-        final OutputStream outStream = new GZIPOutputStream(fileOut);
-        final StreamResult streamResult = new StreamResult(outStream);
+    protected static void writeFlowXmlFile(ConfigSchema configSchema, OutputStream outputStream) throws TransformerException, ConfigTransformerException, ConfigurationChangeException, IOException {
+        final StreamResult streamResult = new StreamResult(outputStream);
 
         // configure the transformer and convert the DOM
         final TransformerFactory transformFactory = TransformerFactory.newInstance();
@@ -130,9 +135,15 @@ public final class ConfigTransformer {
         transformer.setOutputProperty(OutputKeys.INDENT, "yes");
 
         // transform the document to byte stream
-        transformer.transform(domSource, streamResult);
-        outStream.flush();
-        outStream.close();
+        transformer.transform(createFlowXml(configSchema), streamResult);
+    }
+
+    protected static void writeFlowXmlFile(ConfigSchema configSchema, String path) throws IOException, TransformerException, ConfigurationChangeException, ConfigTransformerException {
+        try (OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"))) {
+            try (OutputStream outStream = new GZIPOutputStream(fileOut)) {
+                writeFlowXmlFile(configSchema, outStream);
+            }
+        }
     }
 
     protected static void writeNiFiProperties(ConfigSchema configSchema, OutputStream outputStream) throws FileNotFoundException, UnsupportedEncodingException, ConfigurationChangeException {
@@ -280,7 +291,18 @@ public final class ConfigTransformer {
             CorePropertiesSchema coreProperties = configSchema.getCoreProperties();
             addTextElement(rootNode, "maxTimerDrivenThreadCount", String.valueOf(coreProperties.getMaxConcurrentThreads()));
             addTextElement(rootNode, "maxEventDrivenThreadCount", String.valueOf(coreProperties.getMaxConcurrentThreads()));
-            addProcessGroup(rootNode, configSchema, "rootGroup");
+
+            FlowControllerSchema flowControllerProperties = configSchema.getFlowControllerProperties();
+
+            final Element element = doc.createElement("rootGroup");
+            rootNode.appendChild(element);
+
+            ProcessGroupSchema processGroupSchema = configSchema.getProcessGroupSchema();
+            processGroupSchema.setId(ROOT_GROUP);
+            processGroupSchema.setName(flowControllerProperties.getName());
+            processGroupSchema.setComment(flowControllerProperties.getComment());
+
+            addProcessGroup(doc, element, processGroupSchema, new ParentGroupIdResolver(processGroupSchema));
 
             SecurityPropertiesSchema securityProperties = configSchema.getSecurityProperties();
             if (securityProperties.useSSL()) {
@@ -331,37 +353,38 @@ public final class ConfigTransformer {
         }
     }
 
-    protected static void addProcessGroup(final Element parentElement, ConfigSchema configSchema, final String elementName) throws ConfigurationChangeException {
+    protected static void addProcessGroup(Document doc, Element element, ProcessGroupSchema processGroupSchema, ParentGroupIdResolver parentGroupIdResolver) throws ConfigurationChangeException {
         try {
-            FlowControllerSchema flowControllerProperties = configSchema.getFlowControllerProperties();
-
-            final Document doc = parentElement.getOwnerDocument();
-            final Element element = doc.createElement(elementName);
-            parentElement.appendChild(element);
-            addTextElement(element, "id", "Root-Group");
-            addTextElement(element, "name", flowControllerProperties.getName());
+            String processGroupId = processGroupSchema.getId();
+            addTextElement(element, "id", processGroupId);
+            addTextElement(element, "name", processGroupSchema.getName());
             addPosition(element);
-            addTextElement(element, "comment", flowControllerProperties.getComment());
+            addTextElement(element, "comment", processGroupSchema.getComment());
 
-            List<ProcessorSchema> processors = configSchema.getProcessors();
-            if (processors != null) {
-                for (ProcessorSchema processorConfig : processors) {
-                    addProcessor(element, processorConfig);
-                }
+            for (ProcessorSchema processorConfig : processGroupSchema.getProcessors()) {
+                addProcessor(element, processorConfig);
             }
 
-            List<RemoteProcessingGroupSchema> remoteProcessingGroups = configSchema.getRemoteProcessingGroups();
-            if (remoteProcessingGroups != null) {
-                for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : remoteProcessingGroups) {
-                    addRemoteProcessGroup(element, remoteProcessingGroupSchema);
-                }
+            for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : processGroupSchema.getRemoteProcessingGroups()) {
+                addRemoteProcessGroup(element, remoteProcessingGroupSchema);
             }
 
-            List<ConnectionSchema> connections = configSchema.getConnections();
-            if (connections != null) {
-                for (ConnectionSchema connectionConfig : connections) {
-                    addConnection(element, connectionConfig, configSchema);
-                }
+            for (PortSchema portSchema : processGroupSchema.getInputPortSchemas()) {
+                addPort(doc, element, portSchema, "inputPort");
+            }
+
+            for (PortSchema portSchema : processGroupSchema.getOutputPortSchemas()) {
+                addPort(doc, element, portSchema, "outputPort");
+            }
+
+            for (ProcessGroupSchema child : processGroupSchema.getProcessGroupSchemas()) {
+                Element processGroups = doc.createElement("processGroup");
+                element.appendChild(processGroups);
+                addProcessGroup(doc, processGroups, child, parentGroupIdResolver);
+            }
+
+            for (ConnectionSchema connectionConfig : processGroupSchema.getConnections()) {
+                addConnection(element, connectionConfig, parentGroupIdResolver);
             }
         } catch (ConfigurationChangeException e) {
             throw e;
@@ -370,6 +393,19 @@ public final class ConfigTransformer {
         }
     }
 
+    protected static void addPort(Document doc, Element parentElement, PortSchema portSchema, String tag) {
+        Element element = doc.createElement(tag);
+        parentElement.appendChild(element);
+
+        addTextElement(element, "id", portSchema.getId());
+        addTextElement(element, "name", portSchema.getName());
+
+        addPosition(element);
+        addTextElement(element, "comments", null);
+
+        addTextElement(element, "scheduledState", "RUNNING");
+    }
+
     protected static void addProcessor(final Element parentElement, ProcessorSchema processorConfig) throws ConfigurationChangeException {
         try {
             final Document doc = parentElement.getOwnerDocument();
@@ -511,7 +547,7 @@ public final class ConfigTransformer {
         }
     }
 
-    protected static void addConnection(final Element parentElement, ConnectionSchema connectionProperties, ConfigSchema configSchema) throws ConfigurationChangeException {
+    protected static void addConnection(final Element parentElement, ConnectionSchema connectionProperties, ParentGroupIdResolver parentGroupIdResolver) throws ConfigurationChangeException {
         try {
             final Document doc = parentElement.getOwnerDocument();
             final Element element = doc.createElement("connection");
@@ -526,23 +562,16 @@ public final class ConfigTransformer {
             addTextElement(element, "labelIndex", "1");
             addTextElement(element, "zIndex", "0");
 
-            addTextElement(element, "sourceId", connectionProperties.getSourceId());
-            addTextElement(element, "sourceGroupId", "Root-Group");
-            addTextElement(element, "sourceType", "PROCESSOR");
+            addConnectionSourceOrDestination(element, "source", connectionProperties.getSourceId(), parentGroupIdResolver);
+            addConnectionSourceOrDestination(element, "destination", connectionProperties.getDestinationId(), parentGroupIdResolver);
 
-            final String connectionDestinationId = connectionProperties.getDestinationId();
-            addTextElement(element, "destinationId", connectionDestinationId);
-            final Optional<String> parentGroup = findInputPortParentGroup(connectionDestinationId, configSchema);
-            if (parentGroup.isPresent()) {
-                addTextElement(element, "destinationGroupId", parentGroup.get());
-                addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
+            List<String> sourceRelationshipNames = connectionProperties.getSourceRelationshipNames();
+            if (sourceRelationshipNames.isEmpty()) {
+                addTextElement(element, "relationship", null);
             } else {
-                addTextElement(element, "destinationGroupId", "Root-Group");
-                addTextElement(element, "destinationType", "PROCESSOR");
-            }
-
-            for (String relationshipName : connectionProperties.getSourceRelationshipNames()) {
-                addTextElement(element, "relationship", relationshipName);
+                for (String relationshipName : sourceRelationshipNames) {
+                    addTextElement(element, "relationship", relationshipName);
+                }
             }
 
             addTextElement(element, "maxWorkQueueSize", String.valueOf(connectionProperties.getMaxWorkQueueSize()));
@@ -557,22 +586,36 @@ public final class ConfigTransformer {
         }
     }
 
-    // Locate the associated parent group for a given input port by its id
-    protected static Optional<String> findInputPortParentGroup(String inputPortId, ConfigSchema configSchema) {
-        final List<RemoteProcessingGroupSchema> remoteProcessingGroups = configSchema.getRemoteProcessingGroups();
-        if (remoteProcessingGroups != null) {
-            for (final RemoteProcessingGroupSchema remoteProcessingGroupSchema : remoteProcessingGroups) {
-                final List<RemoteInputPortSchema> remoteInputPorts = remoteProcessingGroupSchema.getInputPorts();
-                for (final RemoteInputPortSchema remoteInputPortSchema : remoteInputPorts) {
-                    if (remoteInputPortSchema != null && inputPortId.equals(remoteInputPortSchema.getId())) {
-                        return Optional.of(remoteProcessingGroupSchema.getName());
-
-                    }
+    protected static void addConnectionSourceOrDestination(Element element, String sourceOrDestination, String id, ParentGroupIdResolver parentGroupIdResolver) {
+        String idTag = sourceOrDestination + "Id";
+        String groupIdTag = sourceOrDestination + "GroupId";
+        String typeTag = sourceOrDestination + "Type";
+
+        String parentId = parentGroupIdResolver.getRemoteInputPortParentId(id);
+        String type;
+
+        if (parentId != null) {
+            type = "REMOTE_INPUT_PORT";
+        } else {
+            parentId = parentGroupIdResolver.getInputPortParentId(id);
+            if (parentId != null) {
+                type = "INPUT_PORT";
+            } else {
+                parentId = parentGroupIdResolver.getOutputPortParentId(id);
+                if (parentId != null) {
+                    type = "OUTPUT_PORT";
+                } else {
+                    parentId = parentGroupIdResolver.getProcessorParentId(id);
+                    type = "PROCESSOR";
                 }
             }
         }
 
-        return Optional.empty();
+        addTextElement(element, idTag, id);
+        if (parentId != null) {
+            addTextElement(element, groupIdTag, parentId);
+        }
+        addTextElement(element, typeTag, type);
     }
 
     protected static void addPosition(final Element parentElement) {
@@ -583,9 +626,7 @@ public final class ConfigTransformer {
     }
 
     protected static void addTextElementIfNotNullOrEmpty(final Element element, final String name, final String value) {
-        if (!StringUtil.isNullOrEmpty(value)) {
-            addTextElement(element, name, value);
-        }
+        StringUtil.doIfNotNullOrEmpty(value, s -> addTextElement(element, name, value));
     }
 
     protected static void addTextElement(final Element element, final String name, final String value) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
new file mode 100644
index 0000000..71088ee
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
@@ -0,0 +1,87 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.bootstrap.util;
+
+import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema;
+import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema;
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+public class ParentGroupIdResolver {
+    private final Map<String, String> processorIdToParentIdMap;
+    private final Map<String, String> inputPortIdToParentIdMap;
+    private final Map<String, String> outputPortIdToParentIdMap;
+    private final Map<String, String> remoteInputPortIdToParentIdMap;
+
+    public ParentGroupIdResolver(ProcessGroupSchema processGroupSchema) {
+        this.processorIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getProcessors);
+        this.inputPortIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getInputPortSchemas);
+        this.outputPortIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getOutputPortSchemas);
+        this.remoteInputPortIdToParentIdMap = getRemoteInputPortParentIdMap(processGroupSchema);
+    }
+
+    protected static Map<String, String> getParentIdMap(ProcessGroupSchema processGroupSchema, Function<ProcessGroupSchema, Collection<? extends BaseSchemaWithIdAndName>> schemaAccessor) {
+        Map<String, String> map = new HashMap<>();
+        getParentIdMap(processGroupSchema, map, schemaAccessor);
+        return map;
+    }
+
+    protected static void getParentIdMap(ProcessGroupSchema processGroupSchema, Map<String, String> output, Function<ProcessGroupSchema,
+            Collection<? extends BaseSchemaWithIdAndName>> schemaAccessor) {
+        schemaAccessor.apply(processGroupSchema).forEach(p -> output.put(p.getId(), processGroupSchema.getId()));
+        processGroupSchema.getProcessGroupSchemas().forEach(p -> getParentIdMap(p, output, schemaAccessor));
+    }
+
+    protected static Map<String, String> getRemoteInputPortParentIdMap(ProcessGroupSchema processGroupSchema) {
+        Map<String, String> result = new HashMap<>();
+        getRemoteInputPortParentIdMap(processGroupSchema, result);
+        return result;
+    }
+
+    protected static void getRemoteInputPortParentIdMap(ProcessGroupSchema processGroupSchema, Map<String, String> output) {
+        for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : processGroupSchema.getRemoteProcessingGroups()) {
+            for (RemoteInputPortSchema remoteInputPortSchema : remoteProcessingGroupSchema.getInputPorts()) {
+                output.put(remoteInputPortSchema.getId(), remoteProcessingGroupSchema.getName());
+            }
+        }
+        processGroupSchema.getProcessGroupSchemas().forEach(p -> getRemoteInputPortParentIdMap(p, output));
+    }
+
+    public String getRemoteInputPortParentId(String id) {
+        return remoteInputPortIdToParentIdMap.get(id);
+    }
+
+    public String getInputPortParentId(String id) {
+        return inputPortIdToParentIdMap.get(id);
+    }
+
+    public String getOutputPortParentId(String id) {
+        return outputPortIdToParentIdMap.get(id);
+    }
+
+    public String getProcessorParentId(String id) {
+        return processorIdToParentIdMap.get(id);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
index 40dbe10..cac9d16 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
@@ -20,20 +20,35 @@ package org.apache.nifi.minifi.bootstrap.util;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.apache.nifi.minifi.commons.schema.ConfigSchema;
 import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
+import org.apache.nifi.minifi.commons.schema.PortSchema;
+import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
+import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema;
+import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema;
+import org.apache.nifi.minifi.commons.schema.common.StringUtil;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
 import org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer;
 import org.junit.Before;
 import org.junit.Test;
+import org.w3c.dom.Document;
 import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
 
+import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathConstants;
 import javax.xml.xpath.XPathExpressionException;
 import javax.xml.xpath.XPathFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -41,17 +56,21 @@ import static org.junit.Assert.assertNull;
 public class ConfigTransformerTest {
 
     private XPathFactory xPathFactory;
+    private Document document;
     private Element config;
+    private DocumentBuilder documentBuilder;
 
     @Before
     public void setup() throws ParserConfigurationException {
-        config = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument().createElement("config");
+        documentBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+        document = documentBuilder.newDocument();
+        config = document.createElement("config");
         xPathFactory = XPathFactory.newInstance();
     }
 
     @Test
     public void testNullQueuePrioritizerNotWritten() throws ConfigurationChangeException, XPathExpressionException {
-        ConfigTransformer.addConnection(config, new ConnectionSchema(Collections.emptyMap()), new ConfigSchema(Collections.emptyMap()));
+        ConfigTransformer.addConnection(config, new ConnectionSchema(Collections.emptyMap()), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME)));
         XPath xpath = xPathFactory.newXPath();
         String expression = "connection/queuePrioritizerClass";
         assertNull(xpath.evaluate(expression, config, XPathConstants.NODE));
@@ -62,7 +81,7 @@ public class ConfigTransformerTest {
         Map<String, Object> map = new HashMap<>();
         map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, "");
 
-        ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ConfigSchema(Collections.emptyMap()));
+        ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME)));
         XPath xpath = xPathFactory.newXPath();
         String expression = "connection/queuePrioritizerClass";
         assertNull(xpath.evaluate(expression, config, XPathConstants.NODE));
@@ -73,9 +92,148 @@ public class ConfigTransformerTest {
         Map<String, Object> map = new HashMap<>();
         map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, FirstInFirstOutPrioritizer.class.getCanonicalName());
 
-        ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ConfigSchema(Collections.emptyMap()));
+        ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME)));
         XPath xpath = xPathFactory.newXPath();
         String expression = "connection/queuePrioritizerClass/text()";
         assertEquals(FirstInFirstOutPrioritizer.class.getCanonicalName(), xpath.evaluate(expression, config, XPathConstants.STRING));
     }
+
+    @Test
+    public void testProcessGroupsTransform() throws Exception {
+        ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream("config-process-groups.yml"));
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ConfigTransformer.writeFlowXmlFile(configSchema, outputStream);
+        Document document = documentBuilder.parse(new ByteArrayInputStream(outputStream.toByteArray()));
+
+        testProcessGroup((Element) xPathFactory.newXPath().evaluate("flowController/rootGroup", document, XPathConstants.NODE), configSchema.getProcessGroupSchema());
+    }
+
+    private void testProcessGroup(Element element, ProcessGroupSchema processGroupSchema) throws XPathExpressionException {
+        assertEquals(processGroupSchema.getId(), getText(element, "id"));
+        assertEquals(processGroupSchema.getName(), getText(element, "name"));
+        assertEquals(nullToEmpty(processGroupSchema.getComment()), nullToEmpty(getText(element, "comment")));
+
+        NodeList processorElements = (NodeList) xPathFactory.newXPath().evaluate("processor", element, XPathConstants.NODESET);
+        assertEquals(processGroupSchema.getProcessors().size(), processorElements.getLength());
+        for (int i = 0; i < processorElements.getLength(); i++) {
+            testProcessor((Element) processorElements.item(i), processGroupSchema.getProcessors().get(i));
+        }
+
+        NodeList remoteProcessGroupElements = (NodeList) xPathFactory.newXPath().evaluate("remoteProcessGroup", element, XPathConstants.NODESET);
+        assertEquals(processGroupSchema.getRemoteProcessingGroups().size(), remoteProcessGroupElements.getLength());
+        for (int i = 0; i < remoteProcessGroupElements.getLength(); i++) {
+            testRemoteProcessGroups((Element) remoteProcessGroupElements.item(i), processGroupSchema.getRemoteProcessingGroups().get(i));
+        }
+
+        NodeList inputPortElements = (NodeList) xPathFactory.newXPath().evaluate("inputPort", element, XPathConstants.NODESET);
+        assertEquals(processGroupSchema.getInputPortSchemas().size(), inputPortElements.getLength());
+        for (int i = 0; i < inputPortElements.getLength(); i++) {
+            testPort((Element) inputPortElements.item(i), processGroupSchema.getInputPortSchemas().get(i));
+        }
+
+        NodeList outputPortElements = (NodeList) xPathFactory.newXPath().evaluate("outputPort", element, XPathConstants.NODESET);
+        assertEquals(processGroupSchema.getOutputPortSchemas().size(), outputPortElements.getLength());
+        for (int i = 0; i < outputPortElements.getLength(); i++) {
+            testPort((Element) outputPortElements.item(i), processGroupSchema.getOutputPortSchemas().get(i));
+        }
+
+        NodeList processGroupElements = (NodeList) xPathFactory.newXPath().evaluate("processGroup", element, XPathConstants.NODESET);
+        assertEquals(processGroupSchema.getProcessGroupSchemas().size(), processGroupElements.getLength());
+        for (int i = 0; i < processGroupElements.getLength(); i++) {
+            testProcessGroup((Element) processGroupElements.item(i), processGroupSchema.getProcessGroupSchemas().get(i));
+        }
+
+        NodeList connectionElements = (NodeList) xPathFactory.newXPath().evaluate("connection", element, XPathConstants.NODESET);
+        assertEquals(processGroupSchema.getConnections().size(), connectionElements.getLength());
+        for (int i = 0; i < connectionElements.getLength(); i++) {
+            testConnection((Element) connectionElements.item(i), processGroupSchema.getConnections().get(i));
+        }
+    }
+
+    private void testProcessor(Element element, ProcessorSchema processorSchema) throws XPathExpressionException {
+        assertEquals(processorSchema.getId(), getText(element, "id"));
+        assertEquals(processorSchema.getName(), getText(element, "name"));
+        assertEquals(processorSchema.getProcessorClass(), getText(element, "class"));
+        assertEquals(processorSchema.getMaxConcurrentTasks().toString(), getText(element, "maxConcurrentTasks"));
+        assertEquals(processorSchema.getSchedulingPeriod(), getText(element, "schedulingPeriod"));
+        assertEquals(processorSchema.getPenalizationPeriod(), getText(element, "penalizationPeriod"));
+        assertEquals(processorSchema.getYieldPeriod(), getText(element, "yieldPeriod"));
+        assertEquals(processorSchema.getSchedulingStrategy(), getText(element, "schedulingStrategy"));
+        assertEquals(processorSchema.getRunDurationNanos().toString(), getText(element, "runDurationNanos"));
+
+        testProperties(element, processorSchema.getProperties());
+    }
+
+    private void testRemoteProcessGroups(Element element, RemoteProcessingGroupSchema remoteProcessingGroupSchema) throws XPathExpressionException {
+        assertEquals(remoteProcessingGroupSchema.getName(), getText(element, "id"));
+        assertEquals(remoteProcessingGroupSchema.getName(), getText(element, "name"));
+        assertEquals(remoteProcessingGroupSchema.getComment(), getText(element, "comment"));
+        assertEquals(remoteProcessingGroupSchema.getUrl(), getText(element, "url"));
+        assertEquals(remoteProcessingGroupSchema.getTimeout(), getText(element, "timeout"));
+        assertEquals(remoteProcessingGroupSchema.getYieldPeriod(), getText(element, "yieldPeriod"));
+
+
+        NodeList inputPortElements = (NodeList) xPathFactory.newXPath().evaluate("inputPort", element, XPathConstants.NODESET);
+        assertEquals(remoteProcessingGroupSchema.getInputPorts().size(), inputPortElements.getLength());
+        for (int i = 0; i < inputPortElements.getLength(); i++) {
+            testRemoteInputPort((Element) inputPortElements.item(i), remoteProcessingGroupSchema.getInputPorts().get(i));
+        }
+    }
+
+    private void testRemoteInputPort(Element element, RemoteInputPortSchema remoteInputPortSchema) throws XPathExpressionException {
+        assertEquals(remoteInputPortSchema.getId(), getText(element, "id"));
+        assertEquals(remoteInputPortSchema.getName(), getText(element, "name"));
+        assertEquals(remoteInputPortSchema.getComment(), getText(element, "comment"));
+        assertEquals(remoteInputPortSchema.getMax_concurrent_tasks().toString(), getText(element, "maxConcurrentTasks"));
+        assertEquals(remoteInputPortSchema.getUseCompression(), Boolean.parseBoolean(getText(element, "useCompression")));
+    }
+
+    private void testPort(Element element, PortSchema portSchema) throws XPathExpressionException {
+        assertEquals(portSchema.getId(), getText(element, "id"));
+        assertEquals(portSchema.getName(), getText(element, "name"));
+        assertEquals("RUNNING", getText(element, "scheduledState"));
+    }
+
+    private void testConnection(Element element, ConnectionSchema connectionSchema) throws XPathExpressionException {
+        assertEquals(connectionSchema.getId(), getText(element, "id"));
+        assertEquals(connectionSchema.getName(), getText(element, "name"));
+
+        assertEquals(connectionSchema.getSourceId(), getText(element, "sourceId"));
+        assertEquals(connectionSchema.getDestinationId(), getText(element, "destinationId"));
+
+        NodeList relationshipNodes = (NodeList) xPathFactory.newXPath().evaluate("relationship", element, XPathConstants.NODESET);
+        Set<String> sourceRelationships = new HashSet<>();
+        for (int i = 0; i < relationshipNodes.getLength(); i++) {
+            String textContent = relationshipNodes.item(i).getTextContent();
+            if (!StringUtil.isNullOrEmpty(textContent)) {
+                sourceRelationships.add(textContent);
+            }
+        }
+
+        assertEquals(new HashSet<>(connectionSchema.getSourceRelationshipNames()), sourceRelationships);
+
+        assertEquals(connectionSchema.getMaxWorkQueueSize().toString(), getText(element, "maxWorkQueueSize"));
+        assertEquals(connectionSchema.getMaxWorkQueueDataSize(), getText(element, "maxWorkQueueDataSize"));
+        assertEquals(connectionSchema.getFlowfileExpiration(), getText(element, "flowFileExpiration"));
+        assertEquals(connectionSchema.getQueuePrioritizerClass(), getText(element, "queuePrioritizerClass"));
+    }
+
+    private void testProperties(Element element, Map<String, Object> expected) throws XPathExpressionException {
+        NodeList propertyElements = (NodeList) xPathFactory.newXPath().evaluate("property", element, XPathConstants.NODESET);
+        Map<String, String> properties = new HashMap<>();
+        for (int i = 0; i < propertyElements.getLength(); i++) {
+            Element item = (Element) propertyElements.item(i);
+            properties.put(getText(item, "name"), getText(item, "value"));
+        }
+        assertEquals(expected.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> nullToEmpty(e.getValue()))), properties);
+    }
+
+    private String getText(Element element, String path) throws XPathExpressionException {
+        return (String) xPathFactory.newXPath().evaluate(path + "/text()", element, XPathConstants.STRING);
+    }
+
+    private String nullToEmpty(Object val) {
+        return val == null ? "" : val.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
new file mode 100644
index 0000000..6f557b7
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
@@ -0,0 +1,129 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.bootstrap.util;
+
+import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ParentGroupIdResolverTest {
+
+    @Test
+    public void testRemoteInputPortParentId() throws IOException, SchemaLoaderException {
+        List<String> configLines = new ArrayList<>();
+        configLines.add("MiNiFi Config Version: 2");
+        configLines.add("Remote Processing Groups:");
+        configLines.add("- name: rpgOne");
+        configLines.add("  Input Ports:");
+        configLines.add("  - id: one");
+        configLines.add("Process Groups:");
+        configLines.add("- Remote Processing Groups:");
+        configLines.add("  - name: rpgTwo");
+        configLines.add("    Input Ports:");
+        configLines.add("    - id: two");
+        ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines);
+        assertEquals("rpgOne", parentGroupIdResolver.getRemoteInputPortParentId("one"));
+        assertEquals("rpgTwo", parentGroupIdResolver.getRemoteInputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getInputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getInputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getOutputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getProcessorParentId("one"));
+        assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+    }
+
+    @Test
+    public void testInputPortParentId() throws IOException, SchemaLoaderException {
+        List<String> configLines = new ArrayList<>();
+        configLines.add("MiNiFi Config Version: 2");
+        configLines.add("Input Ports:");
+        configLines.add("- id: one");
+        configLines.add("Process Groups:");
+        configLines.add("- id: pgTwo");
+        configLines.add("  Input Ports:");
+        configLines.add("  - id: two");
+        ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines);
+        assertNull(parentGroupIdResolver.getRemoteInputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getRemoteInputPortParentId("two"));
+        assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getInputPortParentId("one"));
+        assertEquals("pgTwo", parentGroupIdResolver.getInputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getOutputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getProcessorParentId("one"));
+        assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+    }
+
+    @Test
+    public void testOutputPortParentId() throws IOException, SchemaLoaderException {
+        List<String> configLines = new ArrayList<>();
+        configLines.add("MiNiFi Config Version: 2");
+        configLines.add("Output Ports:");
+        configLines.add("- id: one");
+        configLines.add("Process Groups:");
+        configLines.add("- id: pgTwo");
+        configLines.add("  Output Ports:");
+        configLines.add("  - id: two");
+        ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines);
+        assertNull(parentGroupIdResolver.getRemoteInputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getRemoteInputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getInputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getInputPortParentId("two"));
+        assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getOutputPortParentId("one"));
+        assertEquals("pgTwo", parentGroupIdResolver.getOutputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getProcessorParentId("one"));
+        assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+    }
+
+    @Test
+    public void testProcessorParentId() throws IOException, SchemaLoaderException {
+        List<String> configLines = new ArrayList<>();
+        configLines.add("MiNiFi Config Version: 2");
+        configLines.add("Processors:");
+        configLines.add("- id: one");
+        configLines.add("Process Groups:");
+        configLines.add("- id: pgTwo");
+        configLines.add("  Processors:");
+        configLines.add("  - id: two");
+        ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines);
+        assertNull(parentGroupIdResolver.getRemoteInputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getRemoteInputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getInputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getInputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getOutputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
+        assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getProcessorParentId("one"));
+        assertEquals("pgTwo", parentGroupIdResolver.getProcessorParentId("two"));
+    }
+
+    private ParentGroupIdResolver createParentGroupIdResolver(List<String> configLines) throws IOException, SchemaLoaderException {
+        return new ParentGroupIdResolver(SchemaLoader.loadConfigSchemaFromYaml(new ByteArrayInputStream(configLines.stream().collect(Collectors.joining("\n"))
+                .getBytes(StandardCharsets.UTF_8))).getProcessGroupSchema());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
index 617da90..a0077fe 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
@@ -18,8 +18,6 @@
 package org.apache.nifi.minifi.bootstrap.util;
 
 import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException;
-import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
-import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
 import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
 import org.junit.Test;
 
@@ -217,7 +215,6 @@ public class TestConfigTransformer {
         } catch (InvalidConfigurationException e){
             assertEquals("Failed to transform config file due to:['class' in section 'Processors' because it was not found and it is required], " +
                     "['scheduling strategy' in section 'Provenance Reporting' because it is not a valid scheduling strategy], " +
-                    "[" + BaseSchema.getIssueText(ConnectionSchema.SOURCE_ID_KEY, "Connections", BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED) + "], " +
                     "['source name' in section 'Connections' because it was not found and it is required]", e.getMessage());
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-bootstrap/src/test/resources/config-process-groups.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/config-process-groups.yml b/minifi-bootstrap/src/test/resources/config-process-groups.yml
new file mode 100644
index 0000000..e0e5ef3
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/config-process-groups.yml
@@ -0,0 +1,276 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+MiNiFi Config Version: 2
+Flow Controller:
+  name: ProcessGroupsAndRemoteProcessGroups
+  comment: ''
+Core Properties:
+  flow controller graceful shutdown period: 10 sec
+  flow service write delay interval: 500 ms
+  administrative yield duration: 30 sec
+  bored yield duration: 10 millis
+  max concurrent threads: 1
+FlowFile Repository:
+  partitions: 256
+  checkpoint interval: 2 mins
+  always sync: false
+  Swap:
+    threshold: 20000
+    in period: 5 sec
+    in threads: 1
+    out period: 5 sec
+    out threads: 4
+Content Repository:
+  content claim max appendable size: 10 MB
+  content claim max flow files: 100
+  always sync: false
+Provenance Repository:
+  provenance rollover time: 1 min
+Component Status Repository:
+  buffer size: 1440
+  snapshot frequency: 1 min
+Security Properties:
+  keystore: ''
+  keystore type: ''
+  keystore password: ''
+  key password: ''
+  truststore: ''
+  truststore type: ''
+  truststore password: ''
+  ssl protocol: ''
+  Sensitive Props:
+    key:
+    algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+    provider: BC
+Processors:
+- id: 207748d1-0158-1000-0000-000000000000
+  name: GenerateFlowFile
+  class: org.apache.nifi.processors.standard.GenerateFlowFile
+  max concurrent tasks: 1
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 0 sec
+  penalization period: 30 sec
+  yield period: 1 sec
+  run duration nanos: 0
+  auto-terminated relationships list: []
+  Properties:
+    Batch Size: '1'
+    Data Format: Binary
+    File Size: 1 b
+    Unique FlowFiles: 'false'
+- id: 2079e8bd-0158-1000-0000-000000000000
+  name: LogAttribute
+  class: org.apache.nifi.processors.standard.LogAttribute
+  max concurrent tasks: 1
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 0 sec
+  penalization period: 30 sec
+  yield period: 1 sec
+  run duration nanos: 0
+  auto-terminated relationships list:
+  - success
+  Properties:
+    Attributes to Ignore:
+    Attributes to Log:
+    Log Level: info
+    Log Payload: 'false'
+    Log prefix:
+- id: 2077ab1e-0158-1000-0000-000000000000
+  name: UpdateAttribute
+  class: org.apache.nifi.processors.attributes.UpdateAttribute
+  max concurrent tasks: 1
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 0 sec
+  penalization period: 30 sec
+  yield period: 1 sec
+  run duration nanos: 0
+  auto-terminated relationships list: []
+  Properties:
+    Delete Attributes Expression:
+    top: top
+Process Groups:
+- id: 207888b1-0158-1000-0000-000000000000
+  name: middle
+  Processors:
+  - id: 2078f34e-0158-1000-0000-000000000000
+    name: UpdateAttribute
+    class: org.apache.nifi.processors.attributes.UpdateAttribute
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 0 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list: []
+    Properties:
+      Delete Attributes Expression:
+      middle: middle
+  Process Groups:
+  - id: 20794cd4-0158-1000-0000-000000000000
+    name: bottom
+    Processors:
+    - id: 207a89ba-0158-1000-0000-000000000000
+      name: UpdateAttribute
+      class: org.apache.nifi.processors.attributes.UpdateAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 0 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list: []
+      Properties:
+        Delete Attributes Expression:
+        bottom: bottom
+    Process Groups: []
+    Input Ports:
+    - id: 207a5f50-0158-1000-0000-000000000000
+      name: input
+    Output Ports:
+    - id: 207a6d92-0158-1000-0000-000000000000
+      name: output
+    Connections:
+    - id: 21a6abb9-0158-1000-0000-000000000000
+      name: UpdateAttribute/success/21a39aba-0158-1000-a1a0-1b55bcddcd72
+      source id: 207a89ba-0158-1000-0000-000000000000
+      source relationship names:
+      - success
+      destination id: 21a39aba-0158-1000-a1a0-1b55bcddcd72
+      max work queue size: 10000
+      max work queue data size: 1 GB
+      flowfile expiration: 0 sec
+      queue prioritizer class: ''
+    - id: 207ad5e9-0158-1000-0000-000000000000
+      name: UpdateAttribute/success/null
+      source id: 207a89ba-0158-1000-0000-000000000000
+      source relationship names:
+      - success
+      destination id: 207a6d92-0158-1000-0000-000000000000
+      max work queue size: 10000
+      max work queue data size: 1 GB
+      flowfile expiration: 0 sec
+      queue prioritizer class: ''
+    - id: 207aca0d-0158-1000-0000-000000000000
+      name: null//UpdateAttribute
+      source id: 207a5f50-0158-1000-0000-000000000000
+      source relationship names: []
+      destination id: 207a89ba-0158-1000-0000-000000000000
+      max work queue size: 10000
+      max work queue data size: 1 GB
+      flowfile expiration: 0 sec
+      queue prioritizer class: ''
+    Remote Processing Groups:
+    - name: http://localhost:9091/nifi
+      url: http://localhost:9091/nifi
+      comment: ''
+      timeout: 30 sec
+      yield period: 10 sec
+      Input Ports:
+      - id: 21a39aba-0158-1000-a1a0-1b55bcddcd72
+        name: input2
+        comment: ''
+        max concurrent tasks: 1
+        use compression: false
+  Input Ports:
+  - id: 2078c936-0158-1000-0000-000000000000
+    name: input
+  Output Ports:
+  - id: 2079b327-0158-1000-0000-000000000000
+    name: output
+  Connections:
+  - id: 21a5b1f1-0158-1000-0000-000000000000
+    name: UpdateAttribute/success/21a2fb5e-0158-1000-3b5e-5a7d3aaee01b
+    source id: 2078f34e-0158-1000-0000-000000000000
+    source relationship names:
+    - success
+    destination id: 21a2fb5e-0158-1000-3b5e-5a7d3aaee01b
+    max work queue size: 10000
+    max work queue data size: 1 GB
+    flowfile expiration: 0 sec
+    queue prioritizer class: ''
+  - id: 207b0eb1-0158-1000-0000-000000000000
+    name: UpdateAttribute/success/null
+    source id: 2078f34e-0158-1000-0000-000000000000
+    source relationship names:
+    - success
+    destination id: 207a5f50-0158-1000-0000-000000000000
+    max work queue size: 10000
+    max work queue data size: 1 GB
+    flowfile expiration: 0 sec
+    queue prioritizer class: ''
+  - id: 20792ec2-0158-1000-0000-000000000000
+    name: null//UpdateAttribute
+    source id: 2078c936-0158-1000-0000-000000000000
+    source relationship names: []
+    destination id: 2078f34e-0158-1000-0000-000000000000
+    max work queue size: 10000
+    max work queue data size: 1 GB
+    flowfile expiration: 0 sec
+    queue prioritizer class: ''
+  - id: 207b1880-0158-1000-0000-000000000000
+    name: null//null
+    source id: 207a6d92-0158-1000-0000-000000000000
+    source relationship names: []
+    destination id: 2079b327-0158-1000-0000-000000000000
+    max work queue size: 10000
+    max work queue data size: 1 GB
+    flowfile expiration: 0 sec
+    queue prioritizer class: ''
+  Remote Processing Groups:
+  - name: http://localhost:9090/nifi
+    url: http://localhost:9090/nifi
+    comment: ''
+    timeout: 30 sec
+    yield period: 10 sec
+    Input Ports:
+    - id: 21a2fb5e-0158-1000-3b5e-5a7d3aaee01b
+      name: input
+      comment: ''
+      max concurrent tasks: 1
+      use compression: false
+Input Ports: []
+Output Ports: []
+Connections:
+- id: 2077bf8f-0158-1000-0000-000000000000
+  name: GenerateFlowFile/success/UpdateAttribute
+  source id: 207748d1-0158-1000-0000-000000000000
+  source relationship names:
+  - success
+  destination id: 2077ab1e-0158-1000-0000-000000000000
+  max work queue size: 10000
+  max work queue data size: 1 GB
+  flowfile expiration: 0 sec
+  queue prioritizer class: ''
+- id: 2079cf6f-0158-1000-0000-000000000000
+  name: UpdateAttribute/success/null
+  source id: 2077ab1e-0158-1000-0000-000000000000
+  source relationship names:
+  - success
+  destination id: 2078c936-0158-1000-0000-000000000000
+  max work queue size: 10000
+  max work queue data size: 1 GB
+  flowfile expiration: 0 sec
+  queue prioritizer class: ''
+- id: 2079faa0-0158-1000-0000-000000000000
+  name: null//LogAttribute
+  source id: 2079b327-0158-1000-0000-000000000000
+  source relationship names: []
+  destination id: 2079e8bd-0158-1000-0000-000000000000
+  max work queue size: 10000
+  max work queue data size: 1 GB
+  flowfile expiration: 0 sec
+  queue prioritizer class: ''
+Remote Processing Groups: []

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
index 8dfd9d4..abd6a6c 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
@@ -23,6 +23,8 @@ import org.apache.nifi.minifi.commons.schema.common.StringUtil;
 import org.apache.nifi.minifi.commons.schema.common.WritableSchema;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -31,31 +33,27 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY;
-import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY;
-import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY;
-import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY;
 
-/**
- *
- */
 public class ConfigSchema extends BaseSchema implements WritableSchema, ConvertableSchema<ConfigSchema> {
-    public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: ";
-    public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: ";
-    public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: ";
-    public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: ";
-    public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in Processors and Remote Input Ports: ";
     public static final int CONFIG_VERSION = 2;
     public static final String VERSION = "MiNiFi Config Version";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: ";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_INPUT_PORT_IDS = "Found the following duplicate input port ids: ";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS = "Found the following duplicate output port ids: ";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_IDS = "Found the following ids that occur both in more than one Processor(s), Input Port(s), Output Port(s) and/or Remote Input Port(s): ";
     public static final String CONNECTION_WITH_ID = "Connection with id ";
     public static final String HAS_INVALID_SOURCE_ID = " has invalid source id ";
     public static final String HAS_INVALID_DESTINATION_ID = " has invalid destination id ";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: ";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: ";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: ";
     public static String TOP_LEVEL_NAME = "top level";
     private FlowControllerSchema flowControllerProperties;
     private CorePropertiesSchema coreProperties;
@@ -63,9 +61,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
     private ContentRepositorySchema contentRepositoryProperties;
     private ComponentStatusRepositorySchema componentStatusRepositoryProperties;
     private SecurityPropertiesSchema securityProperties;
-    private List<ProcessorSchema> processors;
-    private List<ConnectionSchema> connections;
-    private List<RemoteProcessingGroupSchema> remoteProcessingGroups;
+    private ProcessGroupSchema processGroupSchema;
     private ProvenanceReportingSchema provenanceReportingProperties;
 
     private ProvenanceRepositorySchema provenanceRepositorySchema;
@@ -85,13 +81,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         componentStatusRepositoryProperties = getMapAsType(map, COMPONENT_STATUS_REPO_KEY, ComponentStatusRepositorySchema.class, TOP_LEVEL_NAME, false);
         securityProperties = getMapAsType(map, SECURITY_PROPS_KEY, SecurityPropertiesSchema.class, TOP_LEVEL_NAME, false);
 
-        processors = convertListToType(getOptionalKeyAsType(map, PROCESSORS_KEY, List.class, TOP_LEVEL_NAME, new ArrayList<>()), "processor", ProcessorSchema.class, PROCESSORS_KEY);
-
-        remoteProcessingGroups = convertListToType(getOptionalKeyAsType(map, REMOTE_PROCESSING_GROUPS_KEY, List.class, TOP_LEVEL_NAME, new ArrayList<>()), "remote processing group",
-                RemoteProcessingGroupSchema.class, REMOTE_PROCESSING_GROUPS_KEY);
-
-        connections = convertListToType(getOptionalKeyAsType(map, CONNECTIONS_KEY, List.class, TOP_LEVEL_NAME, new ArrayList<>()),
-                "connection", ConnectionSchema.class, CONNECTIONS_KEY);
+        processGroupSchema = new ProcessGroupSchema(map, TOP_LEVEL_NAME);
 
         provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false, false);
 
@@ -101,47 +91,63 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         addIssuesIfNotNull(contentRepositoryProperties);
         addIssuesIfNotNull(componentStatusRepositoryProperties);
         addIssuesIfNotNull(securityProperties);
+        addIssuesIfNotNull(processGroupSchema);
         addIssuesIfNotNull(provenanceReportingProperties);
         addIssuesIfNotNull(provenanceRepositorySchema);
-        addIssuesIfNotNull(processors);
-        addIssuesIfNotNull(connections);
-        addIssuesIfNotNull(remoteProcessingGroups);
 
-        Set<String> processorIds = new HashSet<>();
-        List<String> processorIdList = processors.stream().map(ProcessorSchema::getId).collect(Collectors.toList());
-        processorIds.addAll(processorIdList);
+        List<ProcessGroupSchema> allProcessGroups = getAllProcessGroups(processGroupSchema);
+        List<ConnectionSchema> allConnectionSchemas = allProcessGroups.stream().flatMap(p -> p.getConnections().stream()).collect(Collectors.toList());
+        List<RemoteProcessingGroupSchema> allRemoteProcessingGroups = allProcessGroups.stream().flatMap(p -> p.getRemoteProcessingGroups().stream()).collect(Collectors.toList());
 
-        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, processorIdList);
-        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, connections.stream().map(ConnectionSchema::getId).collect(Collectors.toList()));
-        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES,
-                remoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList()));
-
-        Set<String> remoteInputPortIds = new HashSet<>();
-        List<String> remoteInputPortIdList = remoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null)
+        List<String> allProcessorIds = allProcessGroups.stream().flatMap(p -> p.getProcessors().stream()).map(ProcessorSchema::getId).collect(Collectors.toList());
+        List<String> allConnectionIds = allConnectionSchemas.stream().map(ConnectionSchema::getId).collect(Collectors.toList());
+        List<String> allRemoteProcessingGroupNames = allRemoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList());
+        List<String> allRemoteInputPortIds = allRemoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null)
                 .flatMap(r -> r.getInputPorts().stream()).map(RemoteInputPortSchema::getId).collect(Collectors.toList());
-        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, remoteInputPortIdList);
-        remoteInputPortIds.addAll(remoteInputPortIdList);
-
-        Set<String> duplicateIds = new HashSet<>(processorIds);
-        duplicateIds.retainAll(remoteInputPortIds);
-        if (duplicateIds.size() > 0) {
-            addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + duplicateIds.stream().sorted().collect(Collectors.joining(", ")));
+        List<String> allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
+        List<String> allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
+
+        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds);
+        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds);
+        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, allRemoteProcessingGroupNames);
+        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, allRemoteInputPortIds);
+        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_INPUT_PORT_IDS, allInputPortIds);
+        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds);
+
+        // Potential connection sources and destinations need to have unique ids
+        OverlapResults<String> overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds));
+        if (overlapResults.duplicates.size() > 0) {
+            addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.duplicates.stream().sorted().collect(Collectors.joining(", ")));
         }
 
-        Set<String> connectableIds = new HashSet<>(processorIds);
-        connectableIds.addAll(remoteInputPortIds);
-        connections.forEach(c -> {
+        allConnectionSchemas.forEach(c -> {
             String destinationId = c.getDestinationId();
-            if (!StringUtil.isNullOrEmpty(destinationId) && !connectableIds.contains(destinationId)) {
+            if (!StringUtil.isNullOrEmpty(destinationId) && !overlapResults.seen.contains(destinationId)) {
                 addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_DESTINATION_ID + destinationId);
             }
             String sourceId = c.getSourceId();
-            if (!StringUtil.isNullOrEmpty(sourceId) && !connectableIds.contains(sourceId)) {
+            if (!StringUtil.isNullOrEmpty(sourceId) && !overlapResults.seen.contains(sourceId)) {
                 addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_SOURCE_ID + sourceId);
             }
         });
     }
 
+    protected static <T> OverlapResults<T> findOverlap(Collection<T>... collections) {
+        Set<T> seen = new HashSet<>();
+        return new OverlapResults<>(seen, Arrays.stream(collections).flatMap(c -> c.stream()).sequential().filter(s -> !seen.add(s)).collect(Collectors.toSet()));
+    }
+
+    public static List<ProcessGroupSchema> getAllProcessGroups(ProcessGroupSchema processGroupSchema) {
+        List<ProcessGroupSchema> result = new ArrayList<>();
+        addProcessGroups(processGroupSchema, result);
+        return result;
+    }
+
+    private static void addProcessGroups(ProcessGroupSchema processGroupSchema, List<ProcessGroupSchema> result) {
+        result.add(processGroupSchema);
+        processGroupSchema.getProcessGroupSchemas().forEach(p -> addProcessGroups(p, result));
+    }
+
     public Map<String, Object> toMap() {
         Map<String, Object> result = mapSupplier.get();
         result.put(VERSION, getVersion());
@@ -152,9 +158,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         putIfNotNull(result, PROVENANCE_REPO_KEY, provenanceRepositorySchema);
         putIfNotNull(result, COMPONENT_STATUS_REPO_KEY, componentStatusRepositoryProperties);
         putIfNotNull(result, SECURITY_PROPS_KEY, securityProperties);
-        putListIfNotNull(result, PROCESSORS_KEY, processors);
-        putListIfNotNull(result, CONNECTIONS_KEY, connections);
-        putListIfNotNull(result, REMOTE_PROCESSING_GROUPS_KEY, remoteProcessingGroups);
+        result.putAll(processGroupSchema.toMap());
         putIfNotNull(result, PROVENANCE_REPORTING_KEY, provenanceReportingProperties);
         return result;
     }
@@ -179,16 +183,8 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         return securityProperties;
     }
 
-    public List<ProcessorSchema> getProcessors() {
-        return processors;
-    }
-
-    public List<ConnectionSchema> getConnections() {
-        return connections;
-    }
-
-    public List<RemoteProcessingGroupSchema> getRemoteProcessingGroups() {
-        return remoteProcessingGroups;
+    public ProcessGroupSchema getProcessGroupSchema() {
+        return processGroupSchema;
     }
 
     public ProvenanceReportingSchema getProvenanceReportingProperties() {
@@ -212,4 +208,14 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
     public ConfigSchema convert() {
         return this;
     }
+
+    private static class OverlapResults<T> {
+        private final Set<T> seen;
+        private final Set<T> duplicates;
+
+        private OverlapResults(Set<T> seen, Set<T> duplicates) {
+            this.seen = seen;
+            this.duplicates = duplicates;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java
index 768213c..47a87d9 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java
@@ -25,8 +25,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
-
 public class ConnectionSchema extends BaseSchemaWithIdAndName {
     public static final String SOURCE_ID_KEY = "source id";
     public static final String SOURCE_RELATIONSHIP_NAMES_KEY = "source relationship names";
@@ -41,6 +39,7 @@ public class ConnectionSchema extends BaseSchemaWithIdAndName {
     public static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec";
 
     private String sourceId;
+    private boolean needsSourceRelationships = true;
     private List<String> sourceRelationshipNames;
     private String destinationId;
 
@@ -50,21 +49,19 @@ public class ConnectionSchema extends BaseSchemaWithIdAndName {
     private String queuePrioritizerClass;
 
     public ConnectionSchema(Map map) {
-        super(map, CONNECTIONS_KEY);
+        super(map, "Connection(id: {id}, name: {name})");
 
+        String wrapperName = getWrapperName();
         // In case of older version, these may not be available until after construction, validated in getValidationIssues()
-        sourceId = getOptionalKeyAsType(map, SOURCE_ID_KEY, String.class, CONNECTIONS_KEY, "");
-        destinationId = getOptionalKeyAsType(map, DESTINATION_ID_KEY, String.class, CONNECTIONS_KEY, "");
-
-        sourceRelationshipNames = getOptionalKeyAsType(map, SOURCE_RELATIONSHIP_NAMES_KEY, List.class, CONNECTIONS_KEY, new ArrayList<>());
-        if (sourceRelationshipNames.isEmpty()) {
-            addValidationIssue("Expected at least one value in " + SOURCE_RELATIONSHIP_NAMES_KEY + " for " + CONNECTIONS_KEY + " " + getName());
-        }
-
-        maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, Number.class, CONNECTIONS_KEY, DEFAULT_MAX_WORK_QUEUE_SIZE);
-        maxWorkQueueDataSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_DATA_SIZE_KEY, String.class, CONNECTIONS_KEY, DEFAULT_MAX_QUEUE_DATA_SIZE);
-        flowfileExpiration = getOptionalKeyAsType(map, FLOWFILE_EXPIRATION__KEY, String.class, CONNECTIONS_KEY, DEFAULT_FLOWFILE_EXPIRATION);
-        queuePrioritizerClass = getOptionalKeyAsType(map, QUEUE_PRIORITIZER_CLASS_KEY, String.class, CONNECTIONS_KEY, "");
+        sourceId = getOptionalKeyAsType(map, SOURCE_ID_KEY, String.class, wrapperName, "");
+        destinationId = getOptionalKeyAsType(map, DESTINATION_ID_KEY, String.class, wrapperName, "");
+
+        // This could be empty if the source is a port.
+        sourceRelationshipNames = getOptionalKeyAsType(map, SOURCE_RELATIONSHIP_NAMES_KEY, List.class, wrapperName, new ArrayList<>());
+        maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, Number.class, wrapperName, DEFAULT_MAX_WORK_QUEUE_SIZE);
+        maxWorkQueueDataSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_DATA_SIZE_KEY, String.class, wrapperName, DEFAULT_MAX_QUEUE_DATA_SIZE);
+        flowfileExpiration = getOptionalKeyAsType(map, FLOWFILE_EXPIRATION__KEY, String.class, wrapperName, DEFAULT_FLOWFILE_EXPIRATION);
+        queuePrioritizerClass = getOptionalKeyAsType(map, QUEUE_PRIORITIZER_CLASS_KEY, String.class, wrapperName, "");
     }
 
     @Override
@@ -117,14 +114,18 @@ public class ConnectionSchema extends BaseSchemaWithIdAndName {
         return queuePrioritizerClass;
     }
 
+    public void setNeedsSourceRelationships(boolean needsSourceRelationships) {
+        this.needsSourceRelationships = needsSourceRelationships;
+    }
+
     @Override
     public List<String> getValidationIssues() {
+        String wrapperName = getWrapperName();
         List<String> validationIssues = super.getValidationIssues();
-        if (StringUtil.isNullOrEmpty(getSourceId())) {
-            validationIssues.add(getIssueText(SOURCE_ID_KEY, CONNECTIONS_KEY, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
-        }
-        if (StringUtil.isNullOrEmpty(getDestinationId())) {
-            validationIssues.add(getIssueText(DESTINATION_ID_KEY, CONNECTIONS_KEY, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
+        StringUtil.doIfNullOrEmpty(getSourceId(), id -> validationIssues.add(getIssueText(SOURCE_ID_KEY, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED)));
+        StringUtil.doIfNullOrEmpty(getDestinationId(), id -> validationIssues.add(getIssueText(DESTINATION_ID_KEY, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED)));
+        if (needsSourceRelationships && sourceRelationshipNames.isEmpty()) {
+            validationIssues.add("Expected at least one value in " + SOURCE_RELATIONSHIP_NAMES_KEY + " for " + wrapperName + " " + getName());
         }
         return Collections.unmodifiableList(validationIssues);
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/PortSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/PortSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/PortSchema.java
new file mode 100644
index 0000000..27c6afc
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/PortSchema.java
@@ -0,0 +1,31 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+
+import java.util.Map;
+
+public class PortSchema extends BaseSchemaWithIdAndName {
+
+    public PortSchema(Map map, String wrapperName) {
+        super(map, wrapperName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
new file mode 100644
index 0000000..88688fc
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
@@ -0,0 +1,153 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.commons.schema.common.StringUtil;
+import org.apache.nifi.minifi.commons.schema.common.WritableSchema;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.OUTPUT_PORTS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY;
+
+public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements WritableSchema, ConvertableSchema<ProcessGroupSchema> {
+
+    public static final String PROCESS_GROUPS_KEY = "Process Groups";
+    public static final String ID_DEFAULT = "Root-Group";
+
+    private String comment;
+    private List<ProcessorSchema> processors;
+    private List<ConnectionSchema> connections;
+    private List<RemoteProcessingGroupSchema> remoteProcessingGroups;
+    private List<ProcessGroupSchema> processGroupSchemas;
+    private List<PortSchema> inputPortSchemas;
+    private List<PortSchema> outputPortSchemas;
+
+    public ProcessGroupSchema(Map map, String wrapperName) {
+        super(map, wrapperName);
+
+        processors = getOptionalKeyAsList(map, PROCESSORS_KEY, ProcessorSchema::new, wrapperName);
+        remoteProcessingGroups = getOptionalKeyAsList(map, REMOTE_PROCESSING_GROUPS_KEY, RemoteProcessingGroupSchema::new, wrapperName);
+        connections = getOptionalKeyAsList(map, CONNECTIONS_KEY, ConnectionSchema::new, wrapperName);
+        inputPortSchemas = getOptionalKeyAsList(map, INPUT_PORTS_KEY, m -> new PortSchema(m, "InputPort(id: {id}, name: {name})"), wrapperName);
+        outputPortSchemas = getOptionalKeyAsList(map, OUTPUT_PORTS_KEY, m -> new PortSchema(m, "OutputPort(id: {id}, name: {name})"), wrapperName);
+        processGroupSchemas = getOptionalKeyAsList(map, PROCESS_GROUPS_KEY, m -> new ProcessGroupSchema(m, "ProcessGroup(id: {id}, name: {name})"), wrapperName);
+
+        if (ConfigSchema.TOP_LEVEL_NAME.equals(wrapperName)) {
+            if (inputPortSchemas.size() > 0) {
+                addValidationIssue(INPUT_PORTS_KEY, wrapperName, "must be empty in root group as external input/output ports are currently unsupported");
+            }
+            if (outputPortSchemas.size() > 0) {
+                addValidationIssue(OUTPUT_PORTS_KEY, wrapperName, "must be empty in root group as external input/output ports are currently unsupported");
+            }
+        } else if (ID_DEFAULT.equals(getId())) {
+            addValidationIssue(ID_KEY, wrapperName, "must be set to a value not " + ID_DEFAULT + " if not in root group");
+        }
+
+        Set<String> portIds = getPortIds();
+        connections.stream().filter(c -> portIds.contains(c.getSourceId())).forEachOrdered(c -> c.setNeedsSourceRelationships(false));
+
+        addIssuesIfNotNull(processors);
+        addIssuesIfNotNull(remoteProcessingGroups);
+        addIssuesIfNotNull(processGroupSchemas);
+        addIssuesIfNotNull(connections);
+    }
+
+    public Map<String, Object> toMap() {
+        Map<String, Object> result = mapSupplier.get();
+        String id = getId();
+        if (!ID_DEFAULT.equals(id)) {
+            result.put(ID_KEY, id);
+        }
+        StringUtil.doIfNotNullOrEmpty(getName(), name -> result.put(NAME_KEY, name));
+        putListIfNotNull(result, PROCESSORS_KEY, processors);
+        putListIfNotNull(result, PROCESS_GROUPS_KEY, processGroupSchemas);
+        putListIfNotNull(result, INPUT_PORTS_KEY, inputPortSchemas);
+        putListIfNotNull(result, OUTPUT_PORTS_KEY, outputPortSchemas);
+        putListIfNotNull(result, CONNECTIONS_KEY, connections);
+        putListIfNotNull(result, REMOTE_PROCESSING_GROUPS_KEY, remoteProcessingGroups);
+        return result;
+    }
+
+    public List<ProcessorSchema> getProcessors() {
+        return processors;
+    }
+
+    public List<ConnectionSchema> getConnections() {
+        return connections;
+    }
+
+    public List<RemoteProcessingGroupSchema> getRemoteProcessingGroups() {
+        return remoteProcessingGroups;
+    }
+
+    public List<ProcessGroupSchema> getProcessGroupSchemas() {
+        return processGroupSchemas;
+    }
+
+    public Set<String> getPortIds() {
+        Set<String> result = new HashSet<>();
+        inputPortSchemas.stream().map(PortSchema::getId).forEachOrdered(result::add);
+        outputPortSchemas.stream().map(PortSchema::getId).forEachOrdered(result::add);
+        processGroupSchemas.stream().flatMap(p -> p.getPortIds().stream()).forEachOrdered(result::add);
+        return result;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public void setComment(String comment) {
+        this.comment = comment;
+    }
+
+    @Override
+    protected String getId(Map map, String wrapperName) {
+        return getOptionalKeyAsType(map, ID_KEY, String.class, wrapperName, ID_DEFAULT);
+    }
+
+    @Override
+    public ProcessGroupSchema convert() {
+        return this;
+    }
+
+    @Override
+    public int getVersion() {
+        return ConfigSchema.CONFIG_VERSION;
+    }
+
+    public List<PortSchema> getOutputPortSchemas() {
+        return outputPortSchemas;
+    }
+
+    public List<PortSchema> getInputPortSchemas() {
+        return inputPortSchemas;
+    }
+}


Mime
View raw message