nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mcgil...@apache.org
Subject [16/26] incubator-nifi git commit: NIFI-410 pushed RAT exclusions down to the relevant modules and stopped doing broad test resource exclusion - is now specific
Date Tue, 17 Mar 2015 13:49:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
deleted file mode 100644
index f9dfb00..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.event;
-
-import java.util.List;
-
-/**
- * Manages an ordered list of events. The event history size dictates the total
- * number of events to manage for a given source at a given time. When the size
- * is exceeded, the oldest event for that source is evicted.
- *
- * @author unattributed
- */
-public interface EventManager {
-
-    /**
-     * Adds an event to the manager.
-     *
-     * @param event an Event
-     */
-    void addEvent(Event event);
-
-    /**
-     * Returns a list of events for a given source sorted by the event's
-     * timestamp where the most recent event is first in the list.
-     *
-     * @param eventSource the source
-     *
-     * @return the list of events
-     */
-    List<Event> getEvents(String eventSource);
-
-    /*
-     * Returns the most recent event for the source.  If no events exist, then
-     * null is returned.
-     */
-    Event getMostRecentEvent(String eventSource);
-
-    /*
-     * Clears all events for the given source.
-     */
-    void clearEventHistory(String eventSource);
-
-    /**
-     * Returns the history size.
-     *
-     * @return the history size
-     */
-    int getEventHistorySize();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
deleted file mode 100644
index 7fadc78..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.event.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import org.apache.nifi.cluster.event.Event;
-import org.apache.nifi.cluster.event.EventManager;
-
-/**
- * Implements the EventManager.
- *
- * @author unattributed
- */
-public class EventManagerImpl implements EventManager {
-
-    /**
-     * associates the source ID with an ordered queue of events, ordered by most
-     * recent event
-     */
-    private final Map<String, Queue<Event>> eventsMap = new HashMap<>();
-
-    /**
-     * the number of events to maintain for a given source
-     */
-    private final int eventHistorySize;
-
-    /**
-     * Creates an instance.
-     *
-     * @param eventHistorySize the number of events to manage for a given
-     * source. Value must be positive.
-     */
-    public EventManagerImpl(final int eventHistorySize) {
-        if (eventHistorySize <= 0) {
-            throw new IllegalArgumentException("Event history size must be positive: " + eventHistorySize);
-        }
-        this.eventHistorySize = eventHistorySize;
-    }
-
-    @Override
-    public void addEvent(final Event event) {
-
-        if (event == null) {
-            throw new IllegalArgumentException("Event may not be null.");
-        }
-
-        Queue<Event> events = eventsMap.get(event.getSource());
-        if (events == null) {
-            // no events from this source, so add a new queue to the map
-            events = new PriorityQueue<>(eventHistorySize, createEventComparator());
-            eventsMap.put(event.getSource(), events);
-        }
-
-        // add event
-        events.add(event);
-
-        // if we exceeded the history size, then evict the oldest event
-        if (events.size() > eventHistorySize) {
-            removeOldestEvent(events);
-        }
-
-    }
-
-    @Override
-    public List<Event> getEvents(final String eventSource) {
-        final Queue<Event> events = eventsMap.get(eventSource);
-        if (events == null) {
-            return Collections.EMPTY_LIST;
-        } else {
-            return Collections.unmodifiableList(new ArrayList<>(events));
-        }
-    }
-
-    @Override
-    public int getEventHistorySize() {
-        return eventHistorySize;
-    }
-
-    @Override
-    public Event getMostRecentEvent(final String eventSource) {
-        final Queue<Event> events = eventsMap.get(eventSource);
-        if (events == null) {
-            return null;
-        } else {
-            return events.peek();
-        }
-    }
-
-    @Override
-    public void clearEventHistory(final String eventSource) {
-        eventsMap.remove(eventSource);
-    }
-
-    private Comparator createEventComparator() {
-        return new Comparator<Event>() {
-            @Override
-            public int compare(final Event o1, final Event o2) {
-                // orders events by most recent first
-                return (int) (o2.getTimestamp() - o1.getTimestamp());
-            }
-        };
-    }
-
-    private void removeOldestEvent(final Collection<Event> events) {
-
-        if (events.isEmpty()) {
-            return;
-        }
-
-        Event oldestEvent = null;
-        for (final Event event : events) {
-            if (oldestEvent == null || oldestEvent.getTimestamp() > event.getTimestamp()) {
-                oldestEvent = event;
-            }
-        }
-
-        events.remove(oldestEvent);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
deleted file mode 100644
index 2e3d278..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.firewall;
-
-/**
- * Defines the interface for restricting external client connections to a set of
- * hosts or IPs.
- */
-public interface ClusterNodeFirewall {
-
-    /**
-     * Returns true if the given host or IP is permissible through the firewall;
-     * false otherwise.
-     *
-     * If an IP is given, then it must be formatted in dotted decimal notation.
-     * @param hostOrIp
-     * @return 
-     */
-    boolean isPermissible(String hostOrIp);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
deleted file mode 100644
index 916ec14..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.firewall.impl;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-import org.apache.commons.net.util.SubnetUtils;
-import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
-import org.apache.nifi.util.file.FileUtils;
-import org.apache.nifi.logging.NiFiLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A file-based implementation of the ClusterFirewall interface. The class is
- * configured with a file. If the file is empty, then everything is permissible.
- * Otherwise, the file should contain hostnames or IPs formatted as dotted
- * decimals with an optional CIDR suffix. Each entry must be separated by a
- * newline. An example configuration is given below:
- *
- * <code>
- * # hash character is a comment delimiter
- * 1.2.3.4         # exact IP
- * some.host.name  # a host name
- * 4.5.6.7/8       # range of CIDR IPs
- * 9.10.11.12/13   # a smaller range of CIDR IPs
- * </code>
- *
- * This class allows for synchronization with an optionally configured restore
- * directory. If configured, then at startup, if the either the config file or
- * the restore directory's copy is missing, then the configuration file will be
- * copied to the appropriate location. If both restore directory contains a copy
- * that is different in content to configuration file, then an exception is
- * thrown at construction time.
- */
-public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall {
-
-    private final File config;
-
-    private final File restoreDirectory;
-
-    private final Collection<SubnetUtils.SubnetInfo> subnetInfos = new ArrayList<>();
-
-    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(FileBasedClusterNodeFirewall.class));
-
-    public FileBasedClusterNodeFirewall(final File config) throws IOException {
-        this(config, null);
-    }
-
-    public FileBasedClusterNodeFirewall(final File config, final File restoreDirectory) throws IOException {
-
-        if (config == null) {
-            throw new IllegalArgumentException("Firewall configuration file may not be null.");
-        }
-
-        this.config = config;
-        this.restoreDirectory = restoreDirectory;
-
-        if (restoreDirectory != null) {
-            // synchronize with restore directory
-            try {
-                syncWithRestoreDirectory();
-            } catch (final IOException ioe) {
-                throw new RuntimeException(ioe);
-            }
-        }
-
-        if (!config.exists() && !config.createNewFile()) {
-            throw new IOException("Firewall configuration file did not exist and could not be created: " + config.getAbsolutePath());
-        }
-
-        logger.info("Loading cluster firewall configuration.");
-        parseConfig(config);
-        logger.info("Cluster firewall configuration loaded.");
-    }
-
-    @Override
-    public boolean isPermissible(final String hostOrIp) {
-        try {
-
-            // if no rules, then permit everything
-            if (subnetInfos.isEmpty()) {
-                return true;
-            }
-
-            final String ip;
-            try {
-                ip = InetAddress.getByName(hostOrIp).getHostAddress();
-            } catch (final UnknownHostException uhe) {
-                logger.warn("Blocking unknown host: " + hostOrIp, uhe);
-                return false;
-            }
-
-            // check each subnet to see if IP is in range
-            for (final SubnetUtils.SubnetInfo subnetInfo : subnetInfos) {
-                if (subnetInfo.isInRange(ip)) {
-                    return true;
-                }
-            }
-
-            // no match
-            return false;
-
-        } catch (final IllegalArgumentException iae) {
-            return false;
-        }
-    }
-
-    private void syncWithRestoreDirectory() throws IOException {
-
-        // sanity check that restore directory is a directory, creating it if necessary
-        FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
-
-        // check that restore directory is not the same as the primary directory
-        if (config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
-            throw new IllegalStateException(
-                    String.format("Cluster firewall configuration file '%s' cannot be in the restore directory '%s' ",
-                            config.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
-        }
-
-        // the restore copy will have same file name, but reside in a different directory
-        final File restoreFile = new File(restoreDirectory, config.getName());
-
-        // sync the primary copy with the restore copy
-        FileUtils.syncWithRestore(config, restoreFile, logger);
-
-    }
-
-    private void parseConfig(final File config) throws IOException {
-
-        // clear old information
-        subnetInfos.clear();
-        try (BufferedReader br = new BufferedReader(new FileReader(config))) {
-
-            String ipOrHostLine;
-            String ipCidr;
-            int totalIpsAdded = 0;
-            while ((ipOrHostLine = br.readLine()) != null) {
-
-                // cleanup whitespace
-                ipOrHostLine = ipOrHostLine.trim();
-
-                if (ipOrHostLine.isEmpty() || ipOrHostLine.startsWith("#")) {
-                    // skip empty lines or comments
-                    continue;
-                } else if (ipOrHostLine.contains("#")) {
-                    // parse out comments in IP containing lines
-                    ipOrHostLine = ipOrHostLine.substring(0, ipOrHostLine.indexOf("#")).trim();
-                }
-
-                // if given a complete IP, then covert to CIDR
-                if (ipOrHostLine.contains("/")) {
-                    ipCidr = ipOrHostLine;
-                } else if (ipOrHostLine.contains("\\")) {
-                    logger.warn("CIDR IP notation uses forward slashes '/'.  Replacing backslash '\\' with forward slash'/' for '" + ipOrHostLine + "'");
-                    ipCidr = ipOrHostLine.replace("\\", "/");
-                } else {
-                    try {
-                        ipCidr = InetAddress.getByName(ipOrHostLine).getHostAddress();
-                        if (!ipOrHostLine.equals(ipCidr)) {
-                            logger.debug(String.format("Resolved host '%s' to ip '%s'", ipOrHostLine, ipCidr));
-                        }
-                        ipCidr += "/32";
-                        logger.debug("Adding CIDR to exact IP: " + ipCidr);
-                    } catch (final UnknownHostException uhe) {
-                        logger.warn("Firewall is skipping unknown host address: " + ipOrHostLine);
-                        continue;
-                    }
-                }
-
-                try {
-                    logger.debug("Adding CIDR IP to firewall: " + ipCidr);
-                    final SubnetUtils subnetUtils = new SubnetUtils(ipCidr);
-                    subnetUtils.setInclusiveHostCount(true);
-                    subnetInfos.add(subnetUtils.getInfo());
-                    totalIpsAdded++;
-                } catch (final IllegalArgumentException iae) {
-                    logger.warn("Firewall is skipping invalid CIDR address: " + ipOrHostLine);
-                }
-
-            }
-
-            if (totalIpsAdded == 0) {
-                logger.info("No IPs added to firewall.  Firewall will accept all requests.");
-            } else {
-                logger.info(String.format("Added %d IP(s) to firewall.  Only requests originating from the configured IPs will be accepted.", totalIpsAdded));
-            }
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
deleted file mode 100644
index eedb88f..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-
-/**
- * A dataflow with additional information about the cluster.
- *
- * @author unattributed
- */
-public class ClusterDataFlow {
-
-    private final StandardDataFlow dataFlow;
-
-    private final NodeIdentifier primaryNodeId;
-
-    public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) {
-        this.dataFlow = dataFlow;
-        this.primaryNodeId = primaryNodeId;
-    }
-
-    public NodeIdentifier getPrimaryNodeId() {
-        return primaryNodeId;
-    }
-
-    public StandardDataFlow getDataFlow() {
-        return dataFlow;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
deleted file mode 100644
index 6ff15a7..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-/**
- * A base exception for data access exceptions.
- *
- * @author unattributed
- */
-public class DaoException extends RuntimeException {
-
-    public DaoException() {
-    }
-
-    public DaoException(String msg) {
-        super(msg);
-    }
-
-    public DaoException(Throwable cause) {
-        super(cause);
-    }
-
-    public DaoException(String msg, Throwable cause) {
-        super(msg, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
deleted file mode 100644
index a273704..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-/**
- * A data access object for loading and saving the flow managed by the cluster.
- *
- * @author unattributed
- */
-public interface DataFlowDao {
-
-    /**
-     * Loads the cluster's dataflow.
-     *
-     * @return the dataflow or null if no dataflow exists
-     *
-     * @throws DaoException if the dataflow was unable to be loaded
-     */
-    ClusterDataFlow loadDataFlow() throws DaoException;
-
-    /**
-     * Saves the cluster's dataflow.
-     *
-     *
-     * @param dataFlow
-     * @throws DaoException if the dataflow was unable to be saved
-     */
-    void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException;
-
-    /**
-     * Sets the state of the dataflow. If the dataflow does not exist, then an
-     * exception is thrown.
-     *
-     * @param flowState the state of the dataflow
-     *
-     * @throws DaoException if the state was unable to be updated
-     */
-    void setPersistedFlowState(PersistedFlowState flowState) throws DaoException;
-
-    /**
-     * Gets the state of the dataflow.
-     *
-     * @return the state of the dataflow
-     *
-     * @throws DaoException if the state was unable to be retrieved
-     */
-    PersistedFlowState getPersistedFlowState() throws DaoException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
deleted file mode 100644
index 339d904..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-import java.util.Set;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-
-/**
- * A service for managing the cluster's flow. The service will attempt to keep
- * the cluster's dataflow current while respecting the value of the configured
- * retrieval delay.
- *
- * The eligible retrieval time is reset with the configured delay every time the
- * flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then
- * the flow will not be retrieved.
- *
- * Clients must call start() and stop() to initialize and stop the instance.
- *
- * @author unattributed
- */
-public interface DataFlowManagementService {
-
-    /**
-     * Starts the instance. Start may only be called if the instance is not
-     * running.
-     */
-    void start();
-
-    /**
-     * Stops the instance. Stop may only be called if the instance is running.
-     */
-    void stop();
-
-    /**
-     * @return true if the instance is started; false otherwise.
-     */
-    boolean isRunning();
-
-    /**
-     * Loads the dataflow.
-     *
-     * @return the dataflow or null if no dataflow exists
-     */
-    ClusterDataFlow loadDataFlow();
-
-    /**
-     * Updates the dataflow with the given primary node identifier.
-     *
-     * @param nodeId the node identifier
-     *
-     * @throws DaoException if the update failed
-     */
-    void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
-
-    /**
-     * Sets the state of the flow.
-     *
-     * @param flowState the state
-     *
-     * @see PersistedFlowState
-     */
-    void setPersistedFlowState(PersistedFlowState flowState);
-
-    /**
-     * @return the state of the flow
-     */
-    PersistedFlowState getPersistedFlowState();
-
-    /**
-     * @return true if the flow is current; false otherwise.
-     */
-    boolean isFlowCurrent();
-
-    /**
-     * Sets the node identifiers to use when attempting to retrieve the flow.
-     *
-     * @param nodeIds the node identifiers
-     */
-    void setNodeIds(Set<NodeIdentifier> nodeIds);
-
-    /**
-     * Returns the set of node identifiers the service is using to retrieve the
-     * flow.
-     *
-     * @return the set of node identifiers the service is using to retrieve the
-     * flow.
-     */
-    Set<NodeIdentifier> getNodeIds();
-
-    /**
-     * @return the retrieval delay in seconds
-     */
-    int getRetrievalDelaySeconds();
-
-    /**
-     * Sets the retrieval delay.
-     *
-     * @param delay the retrieval delay in seconds
-     */
-    void setRetrievalDelay(String delay);
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
deleted file mode 100644
index b3afc6e..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-/**
- * Represents the various state of a flow managed by the cluster.
- *
- * The semantics of the values are:
- * <ul>
- * <li> CURRENT - the flow is current </li>
- * <li> STALE - the flow is not current, but is eligible to be updated. </li>
- * <li> UNKNOWN - the flow is not current and is not eligible to be updated.
- * </li>
- * </ul>
- *
- * @author unattributed
- */
-public enum PersistedFlowState {
-
-    CURRENT,
-    STALE,
-    UNKNOWN
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
deleted file mode 100644
index ce5a08b..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-/**
- * Represents the exceptional case when a caller is requesting the current flow,
- * but a current flow is not available.
- *
- * @author unattributed
- */
-public class StaleFlowException extends RuntimeException {
-
-    public StaleFlowException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public StaleFlowException(String message) {
-        super(message);
-    }
-
-    public StaleFlowException(Throwable cause) {
-        super(cause);
-    }
-
-    public StaleFlowException() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
deleted file mode 100644
index 72b594a..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ /dev/null
@@ -1,600 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.UUID;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.nifi.cluster.flow.ClusterDataFlow;
-import org.apache.nifi.cluster.flow.DaoException;
-import org.apache.nifi.cluster.flow.DataFlowDao;
-import org.apache.nifi.cluster.flow.PersistedFlowState;
-import org.apache.nifi.cluster.protocol.DataFlow;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.file.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-
-/**
- * Implements the FlowDao interface. The implementation tracks the state of the
- * dataflow by annotating the filename of the flow state file. Specifically, the
- * implementation correlates PersistedFlowState states to filename extensions.
- * The correlation is as follows:
- * <ul>
- * <li> CURRENT maps to flow.xml </li>
- * <li> STALE maps to flow.xml.stale </li>
- * <li> UNKNOWN maps to flow.xml.unknown </li>
- * </ul>
- * Whenever the flow state changes, the flow state file's name is updated to
- * denote its state.
- *
- * The implementation also provides for a restore directory that may be
- * configured for higher availability. At instance creation, if the primary or
- * restore directories have multiple flow state files, an exception is thrown.
- * If the primary directory has a current flow state file, but the restore
- * directory does not, then the primary flow state file is copied to the restore
- * directory. If the restore directory has a current flow state file, but the
- * primary directory does not, then the restore flow state file is copied to the
- * primary directory. If both the primary and restore directories have a current
- * flow state file and the files are different, then an exception is thrown.
- *
- * When the flow state file is saved, it is always saved first to the restore
- * directory followed by a save to the primary directory. When the flow state
- * file is loaded, a check is made to verify that the primary and restore flow
- * state files are both current. If either is not current, then an exception is
- * thrown. The primary flow state file is always read when the load method is
- * called.
- *
- * @author unattributed
- */
-public class DataFlowDaoImpl implements DataFlowDao {
-
-    private final File primaryDirectory;
-    private final File restoreDirectory;
-    private final boolean autoStart;
-    private final String generatedRootGroupId = UUID.randomUUID().toString();
-
-    public static final String STALE_EXT = ".stale";
-    public static final String UNKNOWN_EXT = ".unknown";
-    public static final String FLOW_PACKAGE = "flow.tar";
-    public static final String FLOW_XML_FILENAME = "flow.xml";
-    public static final String TEMPLATES_FILENAME = "templates.xml";
-    public static final String SNIPPETS_FILENAME = "snippets.xml";
-    public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml";
-
-    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class));
-
-    public DataFlowDaoImpl(final File primaryDirectory) throws DaoException {
-        this(primaryDirectory, null, false);
-    }
-
-    public DataFlowDaoImpl(final File primaryDirectory, final File restoreDirectory, final boolean autoStart) throws DaoException {
-
-        // sanity check that primary directory is a directory, creating it if necessary
-        if (primaryDirectory == null) {
-            throw new IllegalArgumentException("Primary directory may not be null.");
-        } else if (!primaryDirectory.exists()) {
-            if (!primaryDirectory.mkdir()) {
-                throw new DaoException(String.format("Failed to create primary directory '%s'", primaryDirectory.getAbsolutePath()));
-            }
-        } else if (!primaryDirectory.isDirectory()) {
-            throw new IllegalArgumentException("Primary directory must be a directory.");
-        }
-
-        this.autoStart = autoStart;
-
-        try {
-            this.primaryDirectory = primaryDirectory;
-            this.restoreDirectory = restoreDirectory;
-
-            if (restoreDirectory == null) {
-                // check that we have exactly one current flow state file
-                ensureSingleCurrentStateFile(primaryDirectory);
-            } else {
-
-                // check that restore directory is a directory, creating it if necessary
-                FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
-
-                // check that restore directory is not the same as the primary directory
-                if (primaryDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
-                    throw new IllegalArgumentException(String.format("Primary directory '%s' is the same as restore directory '%s' ",
-                            primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
-                }
-
-                final File[] primaryFlowStateFiles = getFlowStateFiles(primaryDirectory);
-                final File[] restoreFlowStateFiles = getFlowStateFiles(restoreDirectory);
-
-                // if more than one state file in either primary or restore, then throw exception
-                if (primaryFlowStateFiles.length > 1) {
-                    throw new IllegalStateException(String.format("Found multiple dataflow state files in primary directory '%s'", primaryDirectory));
-                } else if (restoreFlowStateFiles.length > 1) {
-                    throw new IllegalStateException(String.format("Found multiple dataflow state files in restore directory '%s'", restoreDirectory));
-                }
-
-                // check that the single primary state file we found is current or create a new one
-                final File primaryFlowStateFile = ensureSingleCurrentStateFile(primaryDirectory);
-
-                // check that the single restore state file we found is current or create a new one
-                final File restoreFlowStateFile = ensureSingleCurrentStateFile(restoreDirectory);
-
-                // if there was a difference in flow state file directories, then copy the appropriate files
-                if (restoreFlowStateFiles.length == 0 && primaryFlowStateFiles.length != 0) {
-                    // copy primary state file to restore
-                    FileUtils.copyFile(primaryFlowStateFile, restoreFlowStateFile, false, false, logger);
-                } else if (primaryFlowStateFiles.length == 0 && restoreFlowStateFiles.length != 0) {
-                    // copy restore state file to primary
-                    FileUtils.copyFile(restoreFlowStateFile, primaryFlowStateFile, false, false, logger);
-                } else {
-                    // sync the primary copy with the restore copy
-                    syncWithRestore(primaryFlowStateFile, restoreFlowStateFile);
-                }
-
-            }
-        } catch (final IOException | IllegalArgumentException | IllegalStateException | JAXBException ex) {
-            throw new DaoException(ex);
-        }
-    }
-    
-    
-    private void syncWithRestore(final File primaryFile, final File restoreFile) throws IOException {
-        try (final FileInputStream primaryFis = new FileInputStream(primaryFile);
-             final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis);
-             final FileInputStream restoreFis = new FileInputStream(restoreFile);
-             final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) {
-            
-            final ArchiveEntry primaryEntry = primaryIn.getNextEntry();
-            final ArchiveEntry restoreEntry = restoreIn.getNextEntry();
-
-            if ( primaryEntry == null && restoreEntry == null ) {
-                return;
-            }
-
-            if ( (primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null) ) {
-                throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
-                        primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
-            }
-            
-            final byte[] primaryMd5 = calculateMd5(primaryIn);
-            final byte[] restoreMd5 = calculateMd5(restoreIn);
-            
-            if ( !Arrays.equals(primaryMd5, restoreMd5) ) {
-                throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
-                        primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
-            }
-        }
-    }
-    
-    private byte[] calculateMd5(final InputStream in) throws IOException {
-        final MessageDigest digest;
-        try {
-            digest = MessageDigest.getInstance("MD5");
-        } catch (final NoSuchAlgorithmException nsae) {
-            throw new IOException(nsae);
-        }
-        
-        int len;
-        final byte[] buffer = new byte[8192];
-        while ((len = in.read(buffer)) > -1) {
-            if (len > 0) {
-                digest.update(buffer, 0, len);
-            }
-        }
-        return digest.digest();
-    }
-
-    @Override
-    public ClusterDataFlow loadDataFlow() throws DaoException {
-        try {
-            return parseDataFlow(getExistingFlowStateFile(primaryDirectory));
-        } catch (final IOException | JAXBException ex) {
-            throw new DaoException(ex);
-        }
-    }
-
-    @Override
-    public void saveDataFlow(final ClusterDataFlow dataFlow) throws DaoException {
-        try {
-
-            final File primaryStateFile = getFlowStateFile(primaryDirectory);
-
-            // write to restore before writing to primary in case primary experiences problems
-            if (restoreDirectory != null) {
-                final File restoreStateFile = getFlowStateFile(restoreDirectory);
-                if (restoreStateFile == null) {
-                    if (primaryStateFile == null) {
-                        writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow);
-                    } else {
-                        throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'",
-                                primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
-                    }
-                } else {
-                    if (primaryStateFile == null) {
-                        throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'",
-                                restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath()));
-                    } else {
-                        final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile);
-                        final PersistedFlowState restoreFlowState = getPersistedFlowState(restoreStateFile);
-                        if (primaryFlowState == restoreFlowState) {
-                            writeDataFlow(restoreStateFile, dataFlow);
-                        } else {
-                            throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'",
-                                    primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState));
-                        }
-                    }
-                }
-            }
-
-            // write dataflow to primary 
-            if (primaryStateFile == null) {
-                writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow);
-            } else {
-                writeDataFlow(primaryStateFile, dataFlow);
-            }
-
-        } catch (final IOException | JAXBException ex) {
-            throw new DaoException(ex);
-        }
-    }
-
-    @Override
-    public PersistedFlowState getPersistedFlowState() {
-        // trust restore over primary if configured for restore
-        if (restoreDirectory == null) {
-            return getPersistedFlowState(getExistingFlowStateFile(primaryDirectory));
-        } else {
-            return getPersistedFlowState(getExistingFlowStateFile(restoreDirectory));
-        }
-    }
-
-    @Override
-    public void setPersistedFlowState(final PersistedFlowState flowState) throws DaoException {
-        // rename restore before primary if configured for restore
-        if (restoreDirectory != null) {
-            renameFlowStateFile(getExistingFlowStateFile(restoreDirectory), flowState);
-        }
-        renameFlowStateFile(getExistingFlowStateFile(primaryDirectory), flowState);
-    }
-
-    private File ensureSingleCurrentStateFile(final File dir) throws IOException, JAXBException {
-
-        // ensure that we have at most one state file and if we have one, it is current
-        final File[] directoryFlowStateFiles = getFlowStateFiles(dir);
-        if (directoryFlowStateFiles.length > 1) {
-            throw new DaoException(String.format("Found multiple dataflow state files in directory '%s'", dir));
-        } else if (directoryFlowStateFiles.length == 0) {
-            // create a new file if none exist
-            return createNewFlowStateFile(dir);
-        } else {
-            // check that the single flow state file is current
-            final PersistedFlowState flowState = getPersistedFlowState(directoryFlowStateFiles[0]);
-            if (PersistedFlowState.CURRENT == flowState) {
-                return directoryFlowStateFiles[0];
-            } else {
-                throw new DaoException(String.format("Dataflow state file '%s' must be current.", directoryFlowStateFiles[0].getAbsolutePath()));
-            }
-        }
-
-    }
-
-    private PersistedFlowState getPersistedFlowState(final File file) {
-        final String path = file.getAbsolutePath();
-        if (path.endsWith(STALE_EXT)) {
-            return PersistedFlowState.STALE;
-        } else if (path.endsWith(UNKNOWN_EXT)) {
-            return PersistedFlowState.UNKNOWN;
-        } else {
-            return PersistedFlowState.CURRENT;
-        }
-    }
-
-    private File getFlowStateFile(final File dir) {
-        final File[] files = getFlowStateFiles(dir);
-        if (files.length > 1) {
-            throw new IllegalStateException(String.format("Expected at most one dataflow state file, but found %s files.", files.length));
-        } else if (files.length == 0) {
-            return null;
-        } else {
-            return files[0];
-        }
-    }
-
-    private File getExistingFlowStateFile(final File dir) {
-        final File file = getFlowStateFile(dir);
-        if (file == null) {
-            throw new IllegalStateException(String.format("Expected a dataflow state file, but none existed in directory '%s'", dir.getAbsolutePath()));
-        }
-        return file;
-    }
-
-    private File[] getFlowStateFiles(final File dir) {
-        final File[] files = dir.listFiles(new FilenameFilter() {
-            @Override
-            public boolean accept(File dir, String name) {
-                return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT));
-            }
-        });
-
-        if (files == null) {
-            return new File[0];
-        } else {
-            return files;
-        }
-    }
-
-    private File removeStateFileExtension(final File file) {
-
-        final String path = file.getAbsolutePath();
-        final int stateFileExtIndex;
-        if (path.endsWith(STALE_EXT)) {
-            stateFileExtIndex = path.lastIndexOf(STALE_EXT);
-        } else if (path.endsWith(UNKNOWN_EXT)) {
-            stateFileExtIndex = path.lastIndexOf(UNKNOWN_EXT);
-        } else {
-            stateFileExtIndex = path.length();
-        }
-
-        return new File(path.substring(0, stateFileExtIndex));
-    }
-
-    private File addStateFileExtension(final File file, final PersistedFlowState state) {
-        switch (state) {
-            case CURRENT: {
-                return file;
-            }
-            case STALE: {
-                return new File(file.getAbsolutePath() + STALE_EXT);
-            }
-            case UNKNOWN: {
-                return new File(file.getAbsolutePath() + UNKNOWN_EXT);
-            }
-            default: {
-                throw new RuntimeException("Unsupported PersistedFlowState Enum value: " + state);
-            }
-        }
-    }
-
-    private File createNewFlowStateFile(final File dir) throws IOException, JAXBException {
-        final File stateFile = new File(dir, FLOW_PACKAGE);
-        stateFile.createNewFile();
-
-        final byte[] flowBytes = getEmptyFlowBytes();
-        final byte[] templateBytes = new byte[0];
-        final byte[] snippetBytes = new byte[0];
-        final DataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
-
-        final ClusterMetadata clusterMetadata = new ClusterMetadata();
-        writeDataFlow(stateFile, dataFlow, clusterMetadata);
-
-        return stateFile;
-    }
-
-    private byte[] getEmptyFlowBytes() throws IOException {
-        try {
-            final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
-            final Document document = docBuilder.newDocument();
-
-            final Element controller = document.createElement("flowController");
-            document.appendChild(controller);
-
-            controller.appendChild(createTextElement(document, "maxThreadCount", "15"));
-
-            final Element rootGroup = document.createElement("rootGroup");
-            rootGroup.appendChild(createTextElement(document, "id", generatedRootGroupId));
-            rootGroup.appendChild(createTextElement(document, "name", "NiFi Flow"));
-
-            // create the position element
-            final Element positionElement = createTextElement(document, "position", "");
-            positionElement.setAttribute("x", "0.0");
-            positionElement.setAttribute("y", "0.0");
-            rootGroup.appendChild(positionElement);
-
-            rootGroup.appendChild(createTextElement(document, "comment", ""));
-            controller.appendChild(rootGroup);
-
-            final Transformer transformer = TransformerFactory.newInstance().newTransformer();
-            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
-            final DOMSource source = new DOMSource(document);
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            final StreamResult result = new StreamResult(baos);
-            transformer.transform(source, result);
-
-            return baos.toByteArray();
-        } catch (final Exception e) {
-            throw new IOException(e);
-        }
-    }
-
-    private Element createTextElement(final Document document, final String elementName, final String value) {
-        final Element element = document.createElement(elementName);
-        element.setTextContent(value);
-        return element;
-    }
-
-    private void renameFlowStateFile(final File flowStateFile, final PersistedFlowState newState) throws DaoException {
-        final PersistedFlowState existingState = getPersistedFlowState(flowStateFile);
-        if (existingState != newState) {
-            final File newFlowStateFile = addStateFileExtension(removeStateFileExtension(flowStateFile), newState);
-            if (flowStateFile.renameTo(newFlowStateFile) == false) {
-                throw new DaoException(
-                        String.format("Failed to rename flow state file '%s' to new name '%s'", flowStateFile.getAbsolutePath(), newFlowStateFile.getAbsolutePath()));
-            }
-        }
-    }
-
-    private ClusterDataFlow parseDataFlow(final File file) throws IOException, JAXBException, DaoException {
-        byte[] flowBytes = new byte[0];
-        byte[] templateBytes = new byte[0];
-        byte[] snippetBytes = new byte[0];
-        byte[] clusterInfoBytes = new byte[0];
-
-        try (final InputStream inStream = new FileInputStream(file);
-                final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
-            TarArchiveEntry tarEntry;
-            while ((tarEntry = tarIn.getNextTarEntry()) != null) {
-                switch (tarEntry.getName()) {
-                    case FLOW_XML_FILENAME:
-                        flowBytes = new byte[(int) tarEntry.getSize()];
-                        StreamUtils.fillBuffer(tarIn, flowBytes, true);
-                        break;
-                    case TEMPLATES_FILENAME:
-                        templateBytes = new byte[(int) tarEntry.getSize()];
-                        StreamUtils.fillBuffer(tarIn, templateBytes, true);
-                        break;
-                    case SNIPPETS_FILENAME:
-                        snippetBytes = new byte[(int) tarEntry.getSize()];
-                        StreamUtils.fillBuffer(tarIn, snippetBytes, true);
-                        break;
-                    case CLUSTER_INFO_FILENAME:
-                        clusterInfoBytes = new byte[(int) tarEntry.getSize()];
-                        StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
-                        break;
-                    default:
-                        throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
-                }
-            }
-        }
-
-        final ClusterMetadata clusterMetadata;
-        if (clusterInfoBytes.length == 0) {
-            clusterMetadata = null;
-        } else {
-            final Unmarshaller clusterMetadataUnmarshaller = ClusterMetadata.jaxbCtx.createUnmarshaller();
-            clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes));
-        }
-
-        final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
-        dataFlow.setAutoStartProcessors(autoStart);
-
-        return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId());
-    }
-
-    private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException {
-
-        // get the data flow
-        DataFlow dataFlow = clusterDataFlow.getDataFlow();
-
-        // if no dataflow, then write a new dataflow
-        if (dataFlow == null) {
-            dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]);
-        }
-
-        // setup the cluster metadata
-        final ClusterMetadata clusterMetadata = new ClusterMetadata();
-        clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId());
-
-        // write to disk
-        writeDataFlow(file, dataFlow, clusterMetadata);
-    }
-
-    private void writeTarEntry(final TarArchiveOutputStream tarOut, final String filename, final byte[] bytes) throws IOException {
-        final TarArchiveEntry flowEntry = new TarArchiveEntry(filename);
-        flowEntry.setSize(bytes.length);
-        tarOut.putArchiveEntry(flowEntry);
-        tarOut.write(bytes);
-        tarOut.closeArchiveEntry();
-    }
-
-    private void writeDataFlow(final File file, final DataFlow dataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException {
-
-        try (final OutputStream fos = new FileOutputStream(file);
-                final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) {
-
-            writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
-            writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates());
-            writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
-
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
-            writeClusterMetadata(clusterMetadata, baos);
-            final byte[] clusterInfoBytes = baos.toByteArray();
-
-            writeTarEntry(tarOut, CLUSTER_INFO_FILENAME, clusterInfoBytes);
-        }
-    }
-
-    private void writeClusterMetadata(final ClusterMetadata clusterMetadata, final OutputStream os) throws IOException, JAXBException {
-        // write cluster metadata to output stream
-        final Marshaller marshaller = ClusterMetadata.jaxbCtx.createMarshaller();
-        marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
-        marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
-        marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
-        marshaller.marshal(clusterMetadata, os);
-    }
-
-    @XmlRootElement(name = "clusterMetadata")
-    private static class ClusterMetadata {
-
-        private NodeIdentifier primaryNodeId;
-
-        private static final JAXBContext jaxbCtx;
-
-        static {
-            try {
-                jaxbCtx = JAXBContext.newInstance(ClusterMetadata.class);
-            } catch (final JAXBException je) {
-                throw new RuntimeException(je);
-            }
-        }
-
-        @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
-        public NodeIdentifier getPrimaryNodeId() {
-            return primaryNodeId;
-        }
-
-        public void setPrimaryNodeId(final NodeIdentifier primaryNodeId) {
-            this.primaryNodeId = primaryNodeId;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
deleted file mode 100644
index e135af3..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow.impl;
-
-import java.util.Collections;
-import java.util.Date;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.nifi.cluster.flow.ClusterDataFlow;
-import org.apache.nifi.cluster.flow.DaoException;
-import org.apache.nifi.cluster.flow.DataFlowDao;
-import org.apache.nifi.cluster.flow.DataFlowManagementService;
-import org.apache.nifi.cluster.flow.PersistedFlowState;
-import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.util.FormatUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements FlowManagementService interface. The service tries to keep the
- * cluster's flow current with regards to the available nodes.
- *
- * The instance may be configured with a retrieval delay, which will reduce the
- * number of retrievals performed by the service at the expense of increasing
- * the chances that the service will not be able to provide a current flow to
- * the caller.
- *
- * By default, the service will try to update the flow as quickly as possible.
- * Configuring a delay enables a less aggressive retrieval strategy.
- * Specifically, the eligible retrieval time is reset every time the flow state
- * is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow
- * will not be retrieved.
- *
- * @author unattributed
- */
-public class DataFlowManagementServiceImpl implements DataFlowManagementService {
-
-    /*
-     * Developer Note: 
-     * 
-     * This class maintains an ExecutorService and a Runnable.
-     * Although the class is not externally threadsafe, its internals are protected to
-     * accommodate multithread access between the ExecutorServer and the Runnable.
-     * 
-     */
-    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class));
-
-    private final DataFlowDao flowDao;
-
-    private final ClusterManagerProtocolSender sender;
-
-    private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet<>();
-
-    private final AtomicBoolean stopRequested = new AtomicBoolean(false);
-
-    private final AtomicLong lastRetrievalTime = new AtomicLong(-1);
-
-    private Timer flowRetriever;
-
-    private long retrievableAfterTime = 0L;
-
-    private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0);
-
-    private final TimingReentrantLock resourceLock = new TimingReentrantLock(new ReentrantLock());
-
-    public DataFlowManagementServiceImpl(final DataFlowDao flowDao, final ClusterManagerProtocolSender sender) {
-        if (flowDao == null) {
-            throw new IllegalArgumentException("Flow DAO may not be null.");
-        } else if (sender == null) {
-            throw new IllegalArgumentException("Cluster Manager Protocol Sender may not be null.");
-        }
-        this.flowDao = flowDao;
-        this.sender = sender;
-    }
-
-    @Override
-    public void start() {
-
-        if (isRunning()) {
-            throw new IllegalArgumentException("Instance is already running.");
-        }
-
-        // reset stop requested
-        stopRequested.set(false);
-
-        // setup flow retreiver timer
-        flowRetriever = new Timer("Flow Management Service", /* is daemon */ true);
-        flowRetriever.schedule(new FlowRetrieverTimerTask(), 0, 500);
-    }
-
-    @Override
-    public boolean isRunning() {
-        return (flowRetriever != null);
-    }
-
-    @Override
-    public void stop() {
-
-        if (isRunning() == false) {
-            throw new IllegalArgumentException("Instance is already stopped.");
-        }
-
-        // record stop request
-        stopRequested.set(true);
-
-        flowRetriever.cancel();
-        flowRetriever = null;
-
-    }
-
-    @Override
-    public ClusterDataFlow loadDataFlow() throws DaoException {
-        resourceLock.lock();
-        try {
-            return flowDao.loadDataFlow();
-        } finally {
-            resourceLock.unlock("loadDataFlow");
-        }
-    }
-
-    @Override
-    public void updatePrimaryNode(final NodeIdentifier nodeId) {
-        resourceLock.lock();
-        try {
-            final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
-
-            final StandardDataFlow dataFlow;
-            if (existingClusterDataFlow == null) {
-                dataFlow = null;
-            } else {
-                dataFlow = existingClusterDataFlow.getDataFlow();
-            }
-
-            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId));
-        } finally {
-            resourceLock.unlock("updatePrimaryNode");
-        }
-    }
-
-    @Override
-    public PersistedFlowState getPersistedFlowState() {
-        resourceLock.lock();
-        try {
-            return flowDao.getPersistedFlowState();
-        } finally {
-            resourceLock.unlock("getPersistedFlowState");
-        }
-    }
-
-    @Override
-    public boolean isFlowCurrent() {
-        return PersistedFlowState.CURRENT == getPersistedFlowState();
-    }
-
-    @Override
-    public void setPersistedFlowState(final PersistedFlowState flowState) {
-        // lock to ensure state change and retrievable time update are atomic
-        resourceLock.lock();
-        try {
-            flowDao.setPersistedFlowState(flowState);
-            if (PersistedFlowState.STALE == flowState) {
-                retrievableAfterTime = new Date().getTime() + (getRetrievalDelaySeconds() * 1000);
-            } else if (PersistedFlowState.UNKNOWN == flowState || PersistedFlowState.CURRENT == flowState) {
-                retrievableAfterTime = Long.MAX_VALUE;
-            }
-        } finally {
-            resourceLock.unlock("setPersistedFlowState");
-        }
-    }
-
-    @Override
-    public Set<NodeIdentifier> getNodeIds() {
-        return Collections.unmodifiableSet(nodeIds);
-    }
-
-    @Override
-    public void setNodeIds(final Set<NodeIdentifier> nodeIds) {
-
-        if (nodeIds == null) {
-            throw new IllegalArgumentException("Node IDs may not be null.");
-        }
-
-        resourceLock.lock();
-        try {
-
-            if (this.nodeIds.equals(nodeIds)) {
-                return;
-            }
-
-            this.nodeIds.clear();
-            this.nodeIds.addAll(nodeIds);
-
-        } finally {
-            resourceLock.unlock("setNodeIds");
-        }
-
-    }
-
-    @Override
-    public int getRetrievalDelaySeconds() {
-        return retrievalDelaySeconds.get();
-    }
-
-    @Override
-    public void setRetrievalDelay(final String retrievalDelay) {
-        this.retrievalDelaySeconds.set((int) FormatUtils.getTimeDuration(retrievalDelay, TimeUnit.SECONDS));
-    }
-
-    public ClusterManagerProtocolSender getSender() {
-        return sender;
-    }
-
-    public long getLastRetrievalTime() {
-        return lastRetrievalTime.get();
-    }
-
-    /**
-     * A timer task for issuing FlowRequestMessage messages to nodes to retrieve
-     * an updated flow.
-     */
-    private class FlowRetrieverTimerTask extends TimerTask {
-
-        @Override
-        public void run() {
-
-            resourceLock.lock();
-            try {
-                // if flow is current, then we're done
-                if (isFlowCurrent()) {
-                    return;
-                }
-            } catch (final Exception ex) {
-                logger.info("Encountered exception checking if flow is current caused by " + ex, ex);
-            } finally {
-                resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent");
-            }
-
-            final FlowRequestMessage request = new FlowRequestMessage();
-            for (final NodeIdentifier nodeId : getNodeIds()) {
-                try {
-                    // setup request
-                    request.setNodeId(nodeId);
-
-                    // record request time
-                    final long requestSentTime = new Date().getTime();
-
-                    resourceLock.lock();
-                    try {
-                        // sanity checks before making request
-                        if (stopRequested.get()) {  // did we receive a stop request
-                            logger.debug("Stopping runnable prematurely because a request to stop was issued.");
-                            return;
-                        } else if (requestSentTime < retrievableAfterTime) {
-                            /*
-                             * Retrievable after time was updated while obtaining
-                             * the lock, so try again later
-                             */
-                            return;
-                        }
-                    } finally {
-                        resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested");
-                    }
-
-                    // send request
-                    final FlowResponseMessage response = sender.requestFlow(request);
-
-                    resourceLock.lock();
-                    try {
-                        // check if the retrieved flow is still valid
-                        if (requestSentTime > retrievableAfterTime) {
-                            logger.info("Saving retrieved flow.");
-
-                            final StandardDataFlow dataFlow = response.getDataFlow();
-                            final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
-                            final ClusterDataFlow currentClusterDataFlow;
-                            if (existingClusterDataFlow == null) {
-                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, null);
-                            } else {
-                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId());
-                            }
-                            flowDao.saveDataFlow(currentClusterDataFlow);
-                            flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
-                            lastRetrievalTime.set(new Date().getTime());
-                        }
-
-                        /*
-                         * Retrievable after time was updated while requesting
-                         * the flow, so try again later.
-                         */
-                    } finally {
-                        resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow");
-                    }
-
-                } catch (final Throwable t) {
-                    logger.info("Encountered exception retrieving flow from node " + nodeId + " caused by " + t, t);
-                }
-            }
-        }
-    }
-
-    private static class TimingReentrantLock {
-
-        private final Lock lock;
-        private static final Logger logger = LoggerFactory.getLogger("dataFlowManagementService.lock");
-
-        private final ThreadLocal<Long> lockTime = new ThreadLocal<>();
-
-        public TimingReentrantLock(final Lock lock) {
-            this.lock = lock;
-        }
-
-        public void lock() {
-            lock.lock();
-            lockTime.set(System.nanoTime());
-        }
-
-        public void unlock(final String task) {
-            final long nanosLocked = System.nanoTime() - lockTime.get();
-            lock.unlock();
-
-            final long millisLocked = TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS);
-            if (millisLocked > 100L) {
-                logger.debug("Lock held for {} milliseconds for task: {}", millisLocked, task);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
deleted file mode 100644
index 3a1dfb2..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.nifi.cluster.event.Event;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
-import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
-import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
-import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
-import org.apache.nifi.cluster.node.Node;
-import org.apache.nifi.cluster.node.Node.Status;
-import org.apache.nifi.cluster.protocol.ConnectionRequest;
-import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
-import org.apache.nifi.remote.cluster.NodeInformant;
-import org.apache.nifi.reporting.BulletinRepository;
-
-/**
- * Defines the interface for a ClusterManager. The cluster manager is a
- * threadsafe centralized manager for a cluster. Members of a cluster are nodes.
- * A member becomes a node by issuing a connection request to the manager. The
- * manager maintains the set of nodes. Nodes may be disconnected, reconnected,
- * and deleted.
- *
- * Nodes are responsible for sending heartbeats to the manager to indicate their
- * liveliness. A manager may disconnect a node if it does not receive a
- * heartbeat within a configurable time period. A cluster manager instance may
- * be configured with how often to monitor received heartbeats
- * (getHeartbeatMonitoringIntervalSeconds()) and the maximum time that may
- * elapse between node heartbeats before disconnecting the node
- * (getMaxHeartbeatGapSeconds()).
- *
- * Since only a single node may execute isolated processors, the cluster manager
- * maintains the notion of a primary node. The primary node is chosen at cluster
- * startup and retains the role until a user requests a different node to be the
- * primary node.
- *
- * @author unattributed
- */
-public interface ClusterManager extends NodeInformant {
-
-    /**
-     * Handles a node's heartbeat.
-     *
-     * @param heartbeat a heartbeat
-     *
-     */
-    void handleHeartbeat(Heartbeat heartbeat);
-
-    /**
-     * @param statuses the statuses of the nodes
-     * @return the set of nodes
-     */
-    Set<Node> getNodes(Status... statuses);
-
-    /**
-     * @param nodeId
-     * @return returns the node with the given identifier or null if node does
-     * not exist
-     */
-    Node getNode(String nodeId);
-
-    /**
-     * @param statuses
-     * @return the set of node identifiers with the given node status
-     */
-    Set<NodeIdentifier> getNodeIds(Status... statuses);
-
-    /**
-     * Deletes the node with the given node identifier. If the given node is the
-     * primary node, then a subsequent request may be made to the manager to set
-     * a new primary node.
-     *
-     * @param nodeId the node identifier
-     * @param userDn the Distinguished Name of the user requesting the node be
-     * deleted from the cluster
-     *
-     * @throws UnknownNodeException if the node does not exist
-     * @throws IllegalNodeDeletionException if the node is not in a disconnected
-     * state
-     */
-    void deleteNode(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDeletionException;
-
-    /**
-     * Requests a connection to the cluster.
-     *
-     * @param request the request
-     *
-     * @return the response
-     */
-    ConnectionResponse requestConnection(ConnectionRequest request);
-
-    /**
-     * Services reconnection requests for a given node. If the node indicates
-     * reconnection failure, then the node will be set to disconnected.
-     * Otherwise, a reconnection request will be sent to the node, initiating
-     * the connection handshake.
-     *
-     * @param nodeId a node identifier
-     * @param userDn the Distinguished Name of the user requesting the
-     * reconnection
-     *
-     * @throws UnknownNodeException if the node does not exist
-     * @throws IllegalNodeReconnectionException if the node is not disconnected
-     */
-    void requestReconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeReconnectionException;
-
-    /**
-     * Requests the node with the given identifier be disconnected.
-     *
-     * @param nodeId the node identifier
-     * @param userDn the Distinguished Name of the user requesting the
-     * disconnection
-     *
-     * @throws UnknownNodeException if the node does not exist
-     * @throws IllegalNodeDisconnectionException if the node cannot be
-     * disconnected due to the cluster's state (e.g., node is last connected
-     * node or node is primary)
-     * @throws UnknownNodeException if the node does not exist
-     * @throws IllegalNodeDisconnectionException if the node is not disconnected
-     * @throws NodeDisconnectionException if the disconnection failed
-     */
-    void requestDisconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException;
-
-    /**
-     * @return the time in seconds to wait between successive executions of
-     * heartbeat monitoring
-     */
-    int getHeartbeatMonitoringIntervalSeconds();
-
-    /**
-     * @return the maximum time in seconds that is allowed between successive
-     * heartbeats of a node before disconnecting the node
-     */
-    int getMaxHeartbeatGapSeconds();
-
-    /**
-     * Returns a list of node events for the node with the given identifier. The
-     * events will be returned in order of most recent to least recent according
-     * to the creation date of the event.
-     *
-     * @param nodeId the node identifier
-     *
-     * @return the list of events or an empty list if no node exists with the
-     * given identifier
-     */
-    List<Event> getNodeEvents(final String nodeId);
-
-    /**
-     * Revokes the primary role from the current primary node and assigns the
-     * primary role to given given node ID.
-     *
-     * If role revocation fails, then the current primary node is set to
-     * disconnected while retaining the primary role and no role assignment is
-     * performed.
-     *
-     * If role assignment fails, then the given node is set to disconnected and
-     * is given the primary role.
-     *
-     * @param nodeId the node identifier
-     * @param userDn the Distinguished Name of the user requesting that the
-     * Primary Node be assigned
-     *
-     * @throws UnknownNodeException if the node with the given identifier does
-     * not exist
-     * @throws IneligiblePrimaryNodeException if the node with the given
-     * identifier is not eligible to be the primary node
-     * @throws PrimaryRoleAssignmentException if the cluster was unable to
-     * change the primary role to the requested node
-     */
-    void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException;
-
-    /**
-     * @return the primary node of the cluster or null if no primary node exists
-     */
-    Node getPrimaryNode();
-
-    /**
-     * Returns the bulletin repository.
-     *
-     * @return
-     */
-    BulletinRepository getBulletinRepository();
-
-    /**
-     * Returns a {@link ProcessGroupStatus} that represents the status of all
-     * nodes with the given {@link Status}es for the given ProcessGroup id, or
-     * null if no nodes exist with the given statuses
-     *
-     * @param groupId
-     * @return
-     */
-    ProcessGroupStatus getProcessGroupStatus(String groupId);
-
-    /**
-     * Returns a merged representation of the System Diagnostics for all nodes
-     * in the cluster
-     *
-     * @return
-     */
-    SystemDiagnostics getSystemDiagnostics();
-}


Mime
View raw message