nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mcgil...@apache.org
Subject [3/4] nifi git commit: NIFI-1800: Tie Controller Services to Process Groups. This closes #431
Date Wed, 11 May 2016 18:56:43 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
deleted file mode 100644
index 5d04cc2..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.BufferedOutputStream;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.TransformerFactoryConfigurationError;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.connectable.Funnel;
-import org.apache.nifi.connectable.Port;
-import org.apache.nifi.connectable.Position;
-import org.apache.nifi.connectable.Size;
-import org.apache.nifi.controller.label.Label;
-import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.groups.RemoteProcessGroup;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.remote.RootGroupPort;
-import org.w3c.dom.DOMException;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-
-/**
- * Serializes a Flow Controller as XML to an output stream.
- *
- * NOT THREAD-SAFE.
- */
-public class StandardFlowSerializer implements FlowSerializer {
-
-    private final StringEncryptor encryptor;
-
-    public StandardFlowSerializer(final StringEncryptor encryptor) {
-        this.encryptor = encryptor;
-    }
-
-    @Override
-    public void serialize(final FlowController controller, final OutputStream os) throws FlowSerializationException {
-        try {
-            // create a new, empty document
-            final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
-            docFactory.setNamespaceAware(true);
-
-            final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
-            final Document doc = docBuilder.newDocument();
-
-            // populate document with controller state
-            final Element rootNode = doc.createElement("flowController");
-            doc.appendChild(rootNode);
-            addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
-            addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
-            addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup");
-
-            final Element controllerServicesNode = doc.createElement("controllerServices");
-            rootNode.appendChild(controllerServicesNode);
-            for (final ControllerServiceNode serviceNode : controller.getAllControllerServices()) {
-                addControllerService(controllerServicesNode, serviceNode, encryptor);
-            }
-
-            final Element reportingTasksNode = doc.createElement("reportingTasks");
-            rootNode.appendChild(reportingTasksNode);
-            for (final ReportingTaskNode taskNode : controller.getAllReportingTasks()) {
-                addReportingTask(reportingTasksNode, taskNode, encryptor);
-            }
-
-            final DOMSource domSource = new DOMSource(doc);
-            final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os));
-
-            // configure the transformer and convert the DOM
-            final TransformerFactory transformFactory = TransformerFactory.newInstance();
-            final Transformer transformer = transformFactory.newTransformer();
-            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
-            // transform the document to byte stream
-            transformer.transform(domSource, streamResult);
-
-        } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) {
-            throw new FlowSerializationException(e);
-        }
-    }
-
-    private void addSize(final Element parentElement, final Size size) {
-        final Element element = parentElement.getOwnerDocument().createElement("size");
-        element.setAttribute("width", String.valueOf(size.getWidth()));
-        element.setAttribute("height", String.valueOf(size.getHeight()));
-        parentElement.appendChild(element);
-    }
-
-    private void addPosition(final Element parentElement, final Position position) {
-        addPosition(parentElement, position, "position");
-    }
-
-    private void addPosition(final Element parentElement, final Position position, final String elementName) {
-        final Element element = parentElement.getOwnerDocument().createElement(elementName);
-        element.setAttribute("x", String.valueOf(position.getX()));
-        element.setAttribute("y", String.valueOf(position.getY()));
-        parentElement.appendChild(element);
-    }
-
-    private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement(elementName);
-        parentElement.appendChild(element);
-        addTextElement(element, "id", group.getIdentifier());
-        addTextElement(element, "name", group.getName());
-        addPosition(element, group.getPosition());
-        addTextElement(element, "comment", group.getComments());
-
-        for (final ProcessorNode processor : group.getProcessors()) {
-            addProcessor(element, processor);
-        }
-
-        if (group.isRootGroup()) {
-            for (final Port port : group.getInputPorts()) {
-                addRootGroupPort(element, (RootGroupPort) port, "inputPort");
-            }
-
-            for (final Port port : group.getOutputPorts()) {
-                addRootGroupPort(element, (RootGroupPort) port, "outputPort");
-            }
-        } else {
-            for (final Port port : group.getInputPorts()) {
-                addPort(element, port, "inputPort");
-            }
-
-            for (final Port port : group.getOutputPorts()) {
-                addPort(element, port, "outputPort");
-            }
-        }
-
-        for (final Label label : group.getLabels()) {
-            addLabel(element, label);
-        }
-
-        for (final Funnel funnel : group.getFunnels()) {
-            addFunnel(element, funnel);
-        }
-
-        for (final ProcessGroup childGroup : group.getProcessGroups()) {
-            addProcessGroup(element, childGroup, "processGroup");
-        }
-
-        for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) {
-            addRemoteProcessGroup(element, remoteRef);
-        }
-
-        for (final Connection connection : group.getConnections()) {
-            addConnection(element, connection);
-        }
-    }
-
-    private void addStyle(final Element parentElement, final Map<String, String> style) {
-        final Element element = parentElement.getOwnerDocument().createElement("styles");
-
-        for (final Map.Entry<String, String> entry : style.entrySet()) {
-            final Element styleElement = parentElement.getOwnerDocument().createElement("style");
-            styleElement.setAttribute("name", entry.getKey());
-            styleElement.setTextContent(entry.getValue());
-            element.appendChild(styleElement);
-        }
-
-        parentElement.appendChild(element);
-    }
-
-    private void addLabel(final Element parentElement, final Label label) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("label");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", label.getIdentifier());
-
-        addPosition(element, label.getPosition());
-        addSize(element, label.getSize());
-        addStyle(element, label.getStyle());
-
-        addTextElement(element, "value", label.getValue());
-        parentElement.appendChild(element);
-    }
-
-    private void addFunnel(final Element parentElement, final Funnel funnel) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("funnel");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", funnel.getIdentifier());
-        addPosition(element, funnel.getPosition());
-    }
-
-    private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("remoteProcessGroup");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", remoteRef.getIdentifier());
-        addTextElement(element, "name", remoteRef.getName());
-        addPosition(element, remoteRef.getPosition());
-        addTextElement(element, "comment", remoteRef.getComments());
-        addTextElement(element, "url", remoteRef.getTargetUri().toString());
-        addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout());
-        addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration());
-        addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting()));
-
-        for (final RemoteGroupPort port : remoteRef.getInputPorts()) {
-            if (port.hasIncomingConnection()) {
-                addRemoteGroupPort(element, port, "inputPort");
-            }
-        }
-
-        for (final RemoteGroupPort port : remoteRef.getOutputPorts()) {
-            if (!port.getConnections().isEmpty()) {
-                addRemoteGroupPort(element, port, "outputPort");
-            }
-        }
-
-        parentElement.appendChild(element);
-    }
-
-    private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPort port, final String elementName) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement(elementName);
-        parentElement.appendChild(element);
-        addTextElement(element, "id", port.getIdentifier());
-        addTextElement(element, "name", port.getName());
-        addPosition(element, port.getPosition());
-        addTextElement(element, "comments", port.getComments());
-        addTextElement(element, "scheduledState", port.getScheduledState().name());
-        addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
-        addTextElement(element, "useCompression", String.valueOf(port.isUseCompression()));
-
-        parentElement.appendChild(element);
-    }
-
-    private void addPort(final Element parentElement, final Port port, final String elementName) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement(elementName);
-        parentElement.appendChild(element);
-        addTextElement(element, "id", port.getIdentifier());
-        addTextElement(element, "name", port.getName());
-        addPosition(element, port.getPosition());
-        addTextElement(element, "comments", port.getComments());
-        addTextElement(element, "scheduledState", port.getScheduledState().name());
-
-        parentElement.appendChild(element);
-    }
-
-    private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement(elementName);
-        parentElement.appendChild(element);
-        addTextElement(element, "id", port.getIdentifier());
-        addTextElement(element, "name", port.getName());
-        addPosition(element, port.getPosition());
-        addTextElement(element, "comments", port.getComments());
-        addTextElement(element, "scheduledState", port.getScheduledState().name());
-        addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks()));
-        for (final String user : port.getUserAccessControl()) {
-            addTextElement(element, "userAccessControl", user);
-        }
-        for (final String group : port.getGroupAccessControl()) {
-            addTextElement(element, "groupAccessControl", group);
-        }
-
-        parentElement.appendChild(element);
-    }
-
-    private void addProcessor(final Element parentElement, final ProcessorNode processor) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("processor");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", processor.getIdentifier());
-        addTextElement(element, "name", processor.getName());
-
-        addPosition(element, processor.getPosition());
-        addStyle(element, processor.getStyle());
-
-        addTextElement(element, "comment", processor.getComments());
-        addTextElement(element, "class", processor.getProcessor().getClass().getCanonicalName());
-        addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks());
-        addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod());
-        addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod());
-        addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
-        addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString());
-        addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
-        addTextElement(element, "scheduledState", processor.getScheduledState().name());
-        addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
-        addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
-
-        addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor);
-
-        for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
-            addTextElement(element, "autoTerminatedRelationship", rel.getName());
-        }
-    }
-
-    private static void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData, final StringEncryptor encryptor) {
-        final Document doc = element.getOwnerDocument();
-        for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
-            final PropertyDescriptor descriptor = entry.getKey();
-            String value = entry.getValue();
-
-            if (value != null && descriptor.isSensitive()) {
-                value = ENC_PREFIX + encryptor.encrypt(value) + ENC_SUFFIX;
-            }
-
-            if (value == null) {
-                value = descriptor.getDefaultValue();
-            }
-
-            final Element propElement = doc.createElement("property");
-            addTextElement(propElement, "name", descriptor.getName());
-            if (value != null) {
-                addTextElement(propElement, "value", value);
-            }
-
-            element.appendChild(propElement);
-        }
-
-        if (annotationData != null) {
-            addTextElement(element, "annotationData", annotationData);
-        }
-    }
-
-    private void addConnection(final Element parentElement, final Connection connection) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("connection");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", connection.getIdentifier());
-        addTextElement(element, "name", connection.getName());
-
-        final Element bendPointsElement = doc.createElement("bendPoints");
-        element.appendChild(bendPointsElement);
-        for (final Position bendPoint : connection.getBendPoints()) {
-            addPosition(bendPointsElement, bendPoint, "bendPoint");
-        }
-
-        addTextElement(element, "labelIndex", connection.getLabelIndex());
-        addTextElement(element, "zIndex", connection.getZIndex());
-
-        final String sourceId = connection.getSource().getIdentifier();
-        final ConnectableType sourceType = connection.getSource().getConnectableType();
-        final String sourceGroupId;
-        if (sourceType == ConnectableType.REMOTE_OUTPUT_PORT) {
-            sourceGroupId = ((RemoteGroupPort) connection.getSource()).getRemoteProcessGroup().getIdentifier();
-        } else {
-            sourceGroupId = connection.getSource().getProcessGroup().getIdentifier();
-        }
-
-        final ConnectableType destinationType = connection.getDestination().getConnectableType();
-        final String destinationId = connection.getDestination().getIdentifier();
-        final String destinationGroupId;
-        if (destinationType == ConnectableType.REMOTE_INPUT_PORT) {
-            destinationGroupId = ((RemoteGroupPort) connection.getDestination()).getRemoteProcessGroup().getIdentifier();
-        } else {
-            destinationGroupId = connection.getDestination().getProcessGroup().getIdentifier();
-        }
-
-        addTextElement(element, "sourceId", sourceId);
-        addTextElement(element, "sourceGroupId", sourceGroupId);
-        addTextElement(element, "sourceType", sourceType.toString());
-
-        addTextElement(element, "destinationId", destinationId);
-        addTextElement(element, "destinationGroupId", destinationGroupId);
-        addTextElement(element, "destinationType", destinationType.toString());
-
-        for (final Relationship relationship : connection.getRelationships()) {
-            addTextElement(element, "relationship", relationship.getName());
-        }
-
-        addTextElement(element, "maxWorkQueueSize", connection.getFlowFileQueue().getBackPressureObjectThreshold());
-        addTextElement(element, "maxWorkQueueDataSize", connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
-
-        addTextElement(element, "flowFileExpiration", connection.getFlowFileQueue().getFlowFileExpiration());
-        for (final FlowFilePrioritizer comparator : connection.getFlowFileQueue().getPriorities()) {
-            final String className = comparator.getClass().getCanonicalName();
-            addTextElement(element, "queuePrioritizerClass", className);
-        }
-
-        parentElement.appendChild(element);
-    }
-
-    public static void addControllerService(final Element element, final ControllerServiceNode serviceNode, final StringEncryptor encryptor) {
-        final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
-        addTextElement(serviceElement, "id", serviceNode.getIdentifier());
-        addTextElement(serviceElement, "name", serviceNode.getName());
-        addTextElement(serviceElement, "comment", serviceNode.getComments());
-        addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName());
-
-        final ControllerServiceState state = serviceNode.getState();
-        final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
-        addTextElement(serviceElement, "enabled", String.valueOf(enabled));
-
-        addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData(), encryptor);
-
-        element.appendChild(serviceElement);
-    }
-
-    public static void addReportingTask(final Element element, final ReportingTaskNode taskNode, final StringEncryptor encryptor) {
-        final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
-        addTextElement(taskElement, "id", taskNode.getIdentifier());
-        addTextElement(taskElement, "name", taskNode.getName());
-        addTextElement(taskElement, "comment", taskNode.getComments());
-        addTextElement(taskElement, "class", taskNode.getReportingTask().getClass().getCanonicalName());
-        addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod());
-        addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name());
-        addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name());
-
-        addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData(), encryptor);
-
-        element.appendChild(taskElement);
-    }
-
-    private static void addTextElement(final Element element, final String name, final long value) {
-        addTextElement(element, name, String.valueOf(value));
-    }
-
-    private static void addTextElement(final Element element, final String name, final String value) {
-        final Document doc = element.getOwnerDocument();
-        final Element toAdd = doc.createElement(name);
-        toAdd.setTextContent(value);
-        element.appendChild(toAdd);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index fb72683..f8b3262 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -68,6 +68,8 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.cluster.Heartbeater;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.controller.serialization.FlowSynchronizationException;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index f03b013..97ed3a9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -55,6 +55,12 @@ import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.serialization.FlowEncodingVersion;
+import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.controller.serialization.FlowSynchronizationException;
+import org.apache.nifi.controller.serialization.FlowSynchronizer;
+import org.apache.nifi.controller.serialization.StandardFlowSerializer;
 import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
@@ -123,19 +129,18 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         final Element rootElement = document.getDocumentElement();
 
         final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
-        final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor);
+        final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement);
+
+        final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
         return isEmpty(rootGroupDto);
     }
 
     @Override
     public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor)
             throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException {
-        // get the controller's root group
-        final ProcessGroup rootGroup = controller.getGroup(controller.getRootGroupId());
-
         // handle corner cases involving no proposed flow
         if (proposedFlow == null) {
-            if (rootGroup.isEmpty()) {
+            if (controller.getGroup(controller.getRootGroupId()).isEmpty()) {
                 return;  // no sync to perform
             } else {
                 throw new UninheritableFlowException("Proposed configuration is empty, but the controller contains a data flow.");
@@ -160,6 +165,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                 } else {
                     final Document document = parseFlowBytes(existingFlow);
                     final Element rootElement = document.getDocumentElement();
+                    final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootElement);
 
                     logger.trace("Setting controller thread counts");
                     final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount");
@@ -180,17 +186,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                     }
 
                     final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices");
-                    final List<Element> controllerServiceElements;
+                    final List<Element> unrootedControllerServiceElements;
                     if (controllerServicesElement == null) {
-                        controllerServiceElements = Collections.emptyList();
+                        unrootedControllerServiceElements = Collections.emptyList();
                     } else {
-                        controllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
+                        unrootedControllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
                     }
 
                     logger.trace("Parsing process group from DOM");
                     final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
-                    final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor);
-                    existingFlowEmpty = taskElements.isEmpty() && controllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
+                    final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
+                    existingFlowEmpty = taskElements.isEmpty() && unrootedControllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
                     logger.debug("Existing Flow Empty = {}", existingFlowEmpty);
                 }
             }
@@ -237,6 +243,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                 synchronized (configuration) {
                     // get the root element
                     final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0);
+                    final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootElement);
 
                     // set controller config
                     logger.trace("Updating flow config");
@@ -252,12 +259,27 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                     // get the root group XML element
                     final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
 
+                    // if this controller isn't initialized or its empty, add the root group, otherwise update
+                    final ProcessGroup rootGroup;
+                    if (!initialized || existingFlowEmpty) {
+                        logger.trace("Adding root process group");
+                        rootGroup = addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
+                    } else {
+                        logger.trace("Updating root process group");
+                        rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
+                    }
+
                     final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices");
                     if (controllerServicesElement != null) {
                         final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
 
                         if (!initialized || existingFlowEmpty) {
-                            ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState);
+                            // If the encoding version is null, we are loading a flow from NiFi 0.x, where Controller
+                            // Services could not be scoped by Process Group. As a result, we want to move the Process Groups
+                            // to the root Group. Otherwise, we want to use a null group, which indicates a Controller-level
+                            // Controller Service.
+                            final ProcessGroup group = (encodingVersion == null) ? rootGroup : null;
+                            ControllerServiceLoader.loadControllerServices(serviceElements, controller, group, encryptor, controller.getBulletinRepository(), autoResumeState);
                         } else {
                             for (final Element serviceElement : serviceElements) {
                                 updateControllerService(controller, serviceElement, encryptor);
@@ -265,15 +287,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                         }
                     }
 
-                    // if this controller isn't initialized or its emtpy, add the root group, otherwise update
-                    if (!initialized || existingFlowEmpty) {
-                        logger.trace("Adding root process group");
-                        addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
-                    } else {
-                        logger.trace("Updating root process group");
-                        updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
-                    }
-
                     final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks");
                     if (reportingTasksElement != null) {
                         final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
@@ -488,14 +501,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         }
     }
 
-    private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor)
-            throws ProcessorInstantiationException {
+    private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
+        final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException {
 
         // get the parent group ID
         final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
 
         // get the process group
-        final ProcessGroupDTO processGroupDto = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor);
+        final ProcessGroupDTO processGroupDto = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
 
         // update the process group
         if (parentId == null) {
@@ -636,7 +649,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         // update nested process groups (recursively)
         final List<Element> nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup");
         for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) {
-            updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor);
+            updateProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor, encodingVersion);
         }
 
         // update connections
@@ -692,6 +705,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             }
         }
 
+        // Update Controller Services
+        final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
+        for (final Element serviceNodeElement : serviceNodeList) {
+            updateControllerService(controller, serviceNodeElement, encryptor);
+        }
+
         return processGroup;
     }
 
@@ -749,13 +768,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         }
     }
 
-    private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor)
-            throws ProcessorInstantiationException {
+    private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
+        final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException {
         // get the parent group ID
         final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier();
 
         // add the process group
-        final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor);
+        final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
         final ProcessGroup processGroup = controller.createProcessGroup(processGroupDTO.getId());
         processGroup.setComments(processGroupDTO.getComments());
         processGroup.setPosition(toPosition(processGroupDTO.getPosition()));
@@ -892,7 +911,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         // add nested process groups (recursively)
         final List<Element> nestedProcessGroupNodeList = getChildrenByTagName(processGroupElement, "processGroup");
         for (final Element nestedProcessGroupElement : nestedProcessGroupNodeList) {
-            addProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor);
+            addProcessGroup(controller, processGroup, nestedProcessGroupElement, encryptor, encodingVersion);
         }
 
         // add remote process group
@@ -1027,6 +1046,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             processGroup.addConnection(connection);
         }
 
+        // Add Controller Services
+        final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
+        if (!serviceNodeList.isEmpty()) {
+            ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor, controller.getBulletinRepository(), autoResumeState);
+        }
+
         return processGroup;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 0d168ec..346f3db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -918,8 +918,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
     @Override
     public boolean isValid() {
         try {
-            final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(),
-                    getAnnotationData());
+            final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier());
 
             final Collection<ValidationResult> validationResults;
             try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
@@ -966,7 +965,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
         final List<ValidationResult> results = new ArrayList<>();
         try {
             final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(),
-                    getAnnotationData());
+                getAnnotationData(), getProcessGroup().getIdentifier());
 
             final Collection<ValidationResult> validationResults;
             try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
@@ -1421,4 +1420,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
             callback.postMonitor();
         }
     }
+
+    @Override
+    protected String getProcessGroupIdentifier() {
+        final ProcessGroup group = getProcessGroup();
+        return group == null ? null : group.getIdentifier();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index c3eb0a0..6aead13 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -254,4 +254,9 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
     public String toString() {
         return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]";
     }
+
+    @Override
+    protected String getProcessGroupIdentifier() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index 11d1b51..ff767ef 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -115,8 +115,8 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
     }
 
     @Override
-    public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
-        return serviceProvider.getControllerServiceIdentifiers(serviceType);
+    public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
+        return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
index 0131a95..8b7b3bf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
@@ -69,13 +69,14 @@ public class StandardReportingInitializationContext implements ReportingInitiali
         return schedulingPeriod;
     }
 
+    @Override
     public SchedulingStrategy getSchedulingStrategy() {
         return schedulingStrategy;
     }
 
     @Override
-    public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
-        return serviceProvider.getControllerServiceIdentifiers(serviceType);
+    public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, String groupId) {
+        return serviceProvider.getControllerServiceIdentifiers(serviceType, groupId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowEncodingVersion.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowEncodingVersion.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowEncodingVersion.java
new file mode 100644
index 0000000..3746112
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowEncodingVersion.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.serialization;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.w3c.dom.Element;
+
+/**
+ * Provides a mechanism for interpreting the version of the encoding scheme that was used to serialize
+ * a NiFi Flow. The versioning scheme is made up of a major version and a minor version, both being
+ * positive integers.
+ */
+public class FlowEncodingVersion {
+    public static final String ENCODING_VERSION_ATTRIBUTE = "encoding-version";
+
+    private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d)+");
+
+    private final int majorVersion;
+    private final int minorVersion;
+
+    public FlowEncodingVersion(final int majorVersion, final int minorVersion) {
+        if (majorVersion < 0) {
+            throw new IllegalArgumentException("Invalid version: Major version cannot be less than 0 but was " + majorVersion);
+        }
+        if (minorVersion < 0) {
+            throw new IllegalArgumentException("Invalid version: Minor version cannot be less than 0 but was " + minorVersion);
+        }
+
+        this.majorVersion = majorVersion;
+        this.minorVersion = minorVersion;
+    }
+
+    /**
+     * Parses the 'encoding-version' attribute of the given XML Element as FlowEncodingVersion.
+     * The attribute value is expected to be in the format &lt;major version&gt;.&lt;minor version&lt;
+     *
+     * @param xmlElement the XML Element that contains an 'encoding-version' attribute
+     * @return a FlowEncodingVersion that has the major and minor versions specified in the String, or <code>null</code> if the input is null or the input
+     *         does not have an 'encoding-version' attribute
+     *
+     * @throws IllegalArgumentException if the value is not in the format &lt;major version&gt;.&lt;minor version&gt;, if either major version or minor
+     *             version is not an integer, or if either the major or minor version is less than 0.
+     */
+    public static FlowEncodingVersion parse(final Element xmlElement) {
+        if (xmlElement == null) {
+            return null;
+        }
+
+        final String version = xmlElement.getAttribute(ENCODING_VERSION_ATTRIBUTE);
+        if (version == null) {
+            return null;
+        }
+
+        return parse(version);
+    }
+
+    /**
+     * Parses the given String as FlowEncodingVersion. The String is expected to be in the format &lt;major version&gt;.&lt;minor version&lt;
+     *
+     * @param version the String representation of the encoding version
+     * @return a FlowEncodingVersion that has the major and minor versions specified in the String, or <code>null</code> if the input is null
+     *
+     * @throws IllegalArgumentException if the value is not in the format &lt;major version&gt;.&lt;minor version&gt;, if either major version or minor
+     *             version is not an integer, or if either the major or minor version is less than 0.
+     */
+    public static FlowEncodingVersion parse(final String version) {
+        if (version == null || version.trim().isEmpty()) {
+            return null;
+        }
+
+        final Matcher matcher = VERSION_PATTERN.matcher(version.trim());
+        if (!matcher.matches()) {
+            throw new IllegalArgumentException(version + " is not a valid version for Flow serialization. Should be in format <number>.<number>");
+        }
+
+        final int majorVersion = Integer.parseInt(matcher.group(1));
+        final int minorVersion = Integer.parseInt(matcher.group(2));
+        return new FlowEncodingVersion(majorVersion, minorVersion);
+    }
+
+    public int getMajorVersion() {
+        return majorVersion;
+    }
+
+    public int getMinorVersion() {
+        return minorVersion;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
new file mode 100644
index 0000000..f6de870
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.serialization;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.connectable.Size;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.DomUtils;
+import org.apache.nifi.web.api.dto.ConnectableDTO;
+import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.FunnelDTO;
+import org.apache.nifi.web.api.dto.LabelDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.dto.PositionDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+public class FlowFromDOMFactory {
+
+    public static PositionDTO getPosition(final Element positionElement) {
+        if (positionElement == null) {
+            throw new IllegalArgumentException("Invalid Flow: Found no 'position' element");
+        }
+        return new PositionDTO(Double.parseDouble(positionElement.getAttribute("x")), Double.parseDouble(positionElement.getAttribute("y")));
+    }
+
+    public static Size getSize(final Element sizeElement) {
+        if (sizeElement == null) {
+            throw new IllegalArgumentException("Invalid Flow: Found no 'size' element");
+        }
+
+        return new Size(Double.parseDouble(sizeElement.getAttribute("width")), Double.parseDouble(sizeElement.getAttribute("height")));
+    }
+
+    public static Map<String, String> getStyle(final Element stylesElement) {
+        final Map<String, String> styles = new HashMap<>();
+        if (stylesElement == null) {
+            return styles;
+        }
+
+        for (final Element styleElement : getChildrenByTagName(stylesElement, "style")) {
+            final String styleName = styleElement.getAttribute("name");
+            final String styleValue = styleElement.getTextContent();
+            styles.put(styleName, styleValue);
+        }
+
+        return styles;
+    }
+
+    public static ControllerServiceDTO getControllerService(final Element element, final StringEncryptor encryptor) {
+        final ControllerServiceDTO dto = new ControllerServiceDTO();
+
+        dto.setId(getString(element, "id"));
+        dto.setName(getString(element, "name"));
+        dto.setComments(getString(element, "comment"));
+        dto.setType(getString(element, "class"));
+
+        final boolean enabled = getBoolean(element, "enabled");
+        dto.setState(enabled ? ControllerServiceState.ENABLED.name() : ControllerServiceState.DISABLED.name());
+
+        dto.setProperties(getProperties(element, encryptor));
+        dto.setAnnotationData(getString(element, "annotationData"));
+
+        return dto;
+    }
+
+    public static ReportingTaskDTO getReportingTask(final Element element, final StringEncryptor encryptor) {
+        final ReportingTaskDTO dto = new ReportingTaskDTO();
+
+        dto.setId(getString(element, "id"));
+        dto.setName(getString(element, "name"));
+        dto.setComments(getString(element, "comment"));
+        dto.setType(getString(element, "class"));
+        dto.setSchedulingPeriod(getString(element, "schedulingPeriod"));
+        dto.setState(getString(element, "scheduledState"));
+        dto.setSchedulingStrategy(getString(element, "schedulingStrategy"));
+
+        dto.setProperties(getProperties(element, encryptor));
+        dto.setAnnotationData(getString(element, "annotationData"));
+
+        return dto;
+    }
+
+    public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) {
+        final ProcessGroupDTO dto = new ProcessGroupDTO();
+        final String groupId = getString(element, "id");
+        dto.setId(groupId);
+        dto.setParentGroupId(parentId);
+        dto.setName(getString(element, "name"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        dto.setComments(getString(element, "comment"));
+
+        final Set<ProcessorDTO> processors = new HashSet<>();
+        final Set<ConnectionDTO> connections = new HashSet<>();
+        final Set<FunnelDTO> funnels = new HashSet<>();
+        final Set<PortDTO> inputPorts = new HashSet<>();
+        final Set<PortDTO> outputPorts = new HashSet<>();
+        final Set<LabelDTO> labels = new HashSet<>();
+        final Set<ProcessGroupDTO> processGroups = new HashSet<>();
+        final Set<RemoteProcessGroupDTO> remoteProcessGroups = new HashSet<>();
+
+        final FlowSnippetDTO groupContents = new FlowSnippetDTO();
+        groupContents.setConnections(connections);
+        groupContents.setFunnels(funnels);
+        groupContents.setInputPorts(inputPorts);
+        groupContents.setLabels(labels);
+        groupContents.setOutputPorts(outputPorts);
+        groupContents.setProcessGroups(processGroups);
+        groupContents.setProcessors(processors);
+        groupContents.setRemoteProcessGroups(remoteProcessGroups);
+
+        NodeList nodeList = DomUtils.getChildNodesByTagName(element, "processor");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            processors.add(getProcessor((Element) nodeList.item(i), encryptor));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "funnel");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            funnels.add(getFunnel((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "inputPort");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            inputPorts.add(getPort((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "outputPort");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            outputPorts.add(getPort((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "label");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            labels.add(getLabel((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "processGroup");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            processGroups.add(getProcessGroup(groupId, (Element) nodeList.item(i), encryptor, encodingVersion));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "remoteProcessGroup");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            remoteProcessGroups.add(getRemoteProcessGroup((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "connection");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            connections.add(getConnection((Element) nodeList.item(i)));
+        }
+
+        dto.setContents(groupContents);
+        return dto;
+    }
+
+    public static ConnectionDTO getConnection(final Element element) {
+        final ConnectionDTO dto = new ConnectionDTO();
+        dto.setId(getString(element, "id"));
+        dto.setName(getString(element, "name"));
+        dto.setLabelIndex(getOptionalInt(element, "labelIndex"));
+        dto.setzIndex(getOptionalLong(element, "zIndex"));
+
+        final List<PositionDTO> bends = new ArrayList<>();
+        final Element bendPointsElement = DomUtils.getChild(element, "bendPoints");
+        if (bendPointsElement != null) {
+            for (final Element bendPointElement : getChildrenByTagName(bendPointsElement, "bendPoint")) {
+                final PositionDTO bend = getPosition(bendPointElement);
+                bends.add(bend);
+            }
+        }
+        dto.setBends(bends);
+
+        final ConnectableDTO sourceConnectable = new ConnectableDTO();
+        dto.setSource(sourceConnectable);
+        sourceConnectable.setId(getString(element, "sourceId"));
+        sourceConnectable.setGroupId(getString(element, "sourceGroupId"));
+        sourceConnectable.setType(getString(element, "sourceType"));
+
+        final ConnectableDTO destConnectable = new ConnectableDTO();
+        dto.setDestination(destConnectable);
+        destConnectable.setId(getString(element, "destinationId"));
+        destConnectable.setGroupId(getString(element, "destinationGroupId"));
+        destConnectable.setType(getString(element, "destinationType"));
+
+        final Set<String> relationships = new HashSet<>();
+        final List<Element> relationshipNodeList = getChildrenByTagName(element, "relationship");
+        for (final Element relationshipElem : relationshipNodeList) {
+            relationships.add(relationshipElem.getTextContent());
+        }
+        dto.setSelectedRelationships(relationships);
+
+        dto.setBackPressureObjectThreshold(getLong(element, "maxWorkQueueSize"));
+
+        final String maxDataSize = getString(element, "maxWorkQueueDataSize");
+        if (maxDataSize != null && !maxDataSize.trim().isEmpty()) {
+            dto.setBackPressureDataSizeThreshold(maxDataSize);
+        }
+
+        String expiration = getString(element, "flowFileExpiration");
+        if (expiration == null) {
+            expiration = "0 sec";
+        }
+        dto.setFlowFileExpiration(expiration);
+
+        final List<String> prioritizerClasses = new ArrayList<>();
+        final List<Element> prioritizerNodeList = getChildrenByTagName(element, "queuePrioritizerClass");
+        for (final Element prioritizerElement : prioritizerNodeList) {
+            prioritizerClasses.add(prioritizerElement.getTextContent().trim());
+        }
+        dto.setPrioritizers(prioritizerClasses);
+
+        return dto;
+    }
+
+    public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element) {
+        final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
+        dto.setId(getString(element, "id"));
+        dto.setName(getString(element, "name"));
+        dto.setTargetUri(getString(element, "url"));
+        dto.setTransmitting(getBoolean(element, "transmitting"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        dto.setCommunicationsTimeout(getString(element, "timeout"));
+        dto.setComments(getString(element, "comment"));
+
+        return dto;
+    }
+
+    public static LabelDTO getLabel(final Element element) {
+        final LabelDTO dto = new LabelDTO();
+        dto.setId(getString(element, "id"));
+        dto.setLabel(getString(element, "value"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        final Size size = getSize(DomUtils.getChild(element, "size"));
+        dto.setWidth(size.getWidth());
+        dto.setHeight(size.getHeight());
+        dto.setStyle(getStyle(DomUtils.getChild(element, "styles")));
+
+        return dto;
+    }
+
+    public static FunnelDTO getFunnel(final Element element) {
+        final FunnelDTO dto = new FunnelDTO();
+        dto.setId(getString(element, "id"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+
+        return dto;
+    }
+
+    public static PortDTO getPort(final Element element) {
+        final PortDTO portDTO = new PortDTO();
+        portDTO.setId(getString(element, "id"));
+        portDTO.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        portDTO.setName(getString(element, "name"));
+        portDTO.setComments(getString(element, "comments"));
+        final ScheduledState scheduledState = getScheduledState(element);
+        portDTO.setState(scheduledState.toString());
+
+        final List<Element> maxTasksElements = getChildrenByTagName(element, "maxConcurrentTasks");
+        if (!maxTasksElements.isEmpty()) {
+            portDTO.setConcurrentlySchedulableTaskCount(Integer.parseInt(maxTasksElements.get(0).getTextContent()));
+        }
+
+        final List<Element> userAccessControls = getChildrenByTagName(element, "userAccessControl");
+        if (userAccessControls != null && !userAccessControls.isEmpty()) {
+            final Set<String> users = new HashSet<>();
+            portDTO.setUserAccessControl(users);
+            for (final Element userElement : userAccessControls) {
+                users.add(userElement.getTextContent());
+            }
+        }
+
+        final List<Element> groupAccessControls = getChildrenByTagName(element, "groupAccessControl");
+        if (groupAccessControls != null && !groupAccessControls.isEmpty()) {
+            final Set<String> groups = new HashSet<>();
+            portDTO.setGroupAccessControl(groups);
+            for (final Element groupElement : groupAccessControls) {
+                groups.add(groupElement.getTextContent());
+            }
+        }
+
+        return portDTO;
+    }
+
+    public static RemoteProcessGroupPortDescriptor getRemoteProcessGroupPort(final Element element) {
+        final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
+
+        // What we have serialized is the ID of the Remote Process Group, followed by a dash ('-'), followed by
+        // the actual ID of the port; we want to get rid of the remote process group id.
+        String id = getString(element, "id");
+        if (id.length() > 37) {
+            id = id.substring(37);
+        }
+
+        descriptor.setId(id);
+        descriptor.setName(getString(element, "name"));
+        descriptor.setComments(getString(element, "comments"));
+        descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
+        descriptor.setUseCompression(getBoolean(element, "useCompression"));
+        descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, "scheduledState")));
+
+        return descriptor;
+    }
+
+    public static ProcessorDTO getProcessor(final Element element, final StringEncryptor encryptor) {
+        final ProcessorDTO dto = new ProcessorDTO();
+
+        dto.setId(getString(element, "id"));
+        dto.setName(getString(element, "name"));
+        dto.setType(getString(element, "class"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        dto.setStyle(getStyle(DomUtils.getChild(element, "styles")));
+
+        final ProcessorConfigDTO configDto = new ProcessorConfigDTO();
+        dto.setConfig(configDto);
+        configDto.setComments(getString(element, "comment"));
+        configDto.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
+        final String schedulingPeriod = getString(element, "schedulingPeriod");
+        configDto.setSchedulingPeriod(schedulingPeriod);
+        configDto.setPenaltyDuration(getString(element, "penalizationPeriod"));
+        configDto.setYieldDuration(getString(element, "yieldPeriod"));
+        configDto.setBulletinLevel(getString(element, "bulletinLevel"));
+        configDto.setLossTolerant(getBoolean(element, "lossTolerant"));
+        final ScheduledState scheduledState = getScheduledState(element);
+        dto.setState(scheduledState.toString());
+
+        // handle scheduling strategy
+        final String schedulingStrategyName = getString(element, "schedulingStrategy");
+        if (schedulingStrategyName == null || schedulingStrategyName.trim().isEmpty()) {
+            configDto.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN.name());
+        } else {
+            configDto.setSchedulingStrategy(schedulingStrategyName.trim());
+        }
+
+        final Long runDurationNanos = getOptionalLong(element, "runDurationNanos");
+        if (runDurationNanos != null) {
+            configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos));
+        }
+
+        configDto.setProperties(getProperties(element, encryptor));
+        configDto.setAnnotationData(getString(element, "annotationData"));
+
+        final Set<String> autoTerminatedRelationships = new HashSet<>();
+        final List<Element> autoTerminateList = getChildrenByTagName(element, "autoTerminatedRelationship");
+        for (final Element autoTerminateElement : autoTerminateList) {
+            autoTerminatedRelationships.add(autoTerminateElement.getTextContent());
+        }
+        configDto.setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+        return dto;
+    }
+
+    private static LinkedHashMap<String, String> getProperties(final Element element, final StringEncryptor encryptor) {
+        final LinkedHashMap<String, String> properties = new LinkedHashMap<>();
+        final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
+        for (final Element propertyElement : propertyNodeList) {
+            final String name = getString(propertyElement, "name");
+            final String value = decrypt(getString(propertyElement, "value"), encryptor);
+            properties.put(name, value);
+        }
+        return properties;
+    }
+
+    private static String getString(final Element element, final String childElementName) {
+        final List<Element> nodeList = getChildrenByTagName(element, childElementName);
+        if (nodeList == null || nodeList.isEmpty()) {
+            return null;
+        }
+        final Element childElement = nodeList.get(0);
+        return childElement.getTextContent();
+    }
+
+    private static Integer getOptionalInt(final Element element, final String childElementName) {
+        final List<Element> nodeList = getChildrenByTagName(element, childElementName);
+        if (nodeList == null || nodeList.isEmpty()) {
+            return null;
+        }
+        final Element childElement = nodeList.get(0);
+        final String val = childElement.getTextContent();
+        if (val == null) {
+            return null;
+        }
+        return Integer.parseInt(val);
+    }
+
+    private static Long getOptionalLong(final Element element, final String childElementName) {
+        final List<Element> nodeList = getChildrenByTagName(element, childElementName);
+        if (nodeList == null || nodeList.isEmpty()) {
+            return null;
+        }
+        final Element childElement = nodeList.get(0);
+        final String val = childElement.getTextContent();
+        if (val == null) {
+            return null;
+        }
+        return Long.parseLong(val);
+    }
+
+    private static int getInt(final Element element, final String childElementName) {
+        return Integer.parseInt(getString(element, childElementName));
+    }
+
+    private static long getLong(final Element element, final String childElementName) {
+        return Long.parseLong(getString(element, childElementName));
+    }
+
+    private static boolean getBoolean(final Element element, final String childElementName) {
+        return Boolean.parseBoolean(getString(element, childElementName));
+    }
+
+    private static ScheduledState getScheduledState(final Element element) {
+        return ScheduledState.valueOf(getString(element, "scheduledState"));
+    }
+
+    private static List<Element> getChildrenByTagName(final Element element, final String childElementName) {
+        return DomUtils.getChildElementsByTagName(element, childElementName);
+    }
+
+    private static String decrypt(final String value, final StringEncryptor encryptor) {
+        if (value != null && value.startsWith(FlowSerializer.ENC_PREFIX) && value.endsWith(FlowSerializer.ENC_SUFFIX)) {
+            return encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(), value.length() - FlowSerializer.ENC_SUFFIX.length()));
+        } else {
+            return value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializationException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializationException.java
new file mode 100644
index 0000000..c552c39
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializationException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.serialization;
+
+/**
+ * Represents the exceptional case when flow configuration is malformed and therefore, cannot be serialized or deserialized.
+ *
+ */
+public class FlowSerializationException extends RuntimeException {
+
+    private static final long serialVersionUID = 128934798237L;
+
+    public FlowSerializationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
+    public FlowSerializationException(Throwable cause) {
+        super(cause);
+    }
+
+    public FlowSerializationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public FlowSerializationException(String message) {
+        super(message);
+    }
+
+    public FlowSerializationException() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java
new file mode 100644
index 0000000..dae7566
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.serialization;
+
+import java.io.OutputStream;
+
+import org.apache.nifi.controller.FlowController;
+
+/**
+ * Serializes the flow configuration of a controller instance to an output stream.
+ *
+ */
+public interface FlowSerializer {
+
+    public static final String ENC_PREFIX = "enc{";
+    public static final String ENC_SUFFIX = "}";
+
+    /**
+     * Serializes the flow configuration of a controller instance.
+     *
+     * @param controller a controller
+     * @param os an output stream to write the configuration to
+     *
+     * @throws FlowSerializationException if serialization failed
+     */
+    void serialize(FlowController controller, OutputStream os) throws FlowSerializationException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationException.java
new file mode 100644
index 0000000..33b4d3c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.cluster.ConnectionException;
+
+/**
+ * Represents the exceptional case when a controller managing an existing flow fails to fully load a different flow.
+ *
+ */
+public class FlowSynchronizationException extends ConnectionException {
+
+    private static final long serialVersionUID = 109234802938L;
+
+    public FlowSynchronizationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
+    public FlowSynchronizationException(Throwable cause) {
+        super(cause);
+    }
+
+    public FlowSynchronizationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public FlowSynchronizationException(String message) {
+        super(message);
+    }
+
+    public FlowSynchronizationException() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/25e7f314/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java
new file mode 100644
index 0000000..86614af
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.UninheritableFlowException;
+import org.apache.nifi.encrypt.StringEncryptor;
+
+/**
+ */
+public interface FlowSynchronizer {
+
+    /**
+     * Synchronizes the given controller with the given flow configuration. If loading the proposed flow configuration would cause the controller to orphan flow files, then an
+     * UninheritableFlowException is thrown.
+     *
+     * If the FlowSynchronizationException is thrown, then the controller may have changed some of its state and should no longer be used.
+     *
+     * @param controller the flow controller
+     * @param dataFlow the flow to load the controller with. If the flow is null or zero length, then the controller must not have a flow or else an UninheritableFlowException will be thrown.
+     * @param encryptor used for the encryption/decryption of sensitive property values
+     *
+     * @throws FlowSerializationException if proposed flow is not a valid flow configuration file
+     * @throws UninheritableFlowException if the proposed flow cannot be loaded by the controller because in doing so would risk orphaning flow files
+     * @throws FlowSynchronizationException if updates to the controller failed. If this exception is thrown, then the controller should be considered unsafe to be used
+     */
+    void sync(FlowController controller, DataFlow dataFlow, StringEncryptor encryptor)
+            throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException;
+
+}


Mime
View raw message