nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [47/51] [partial] nifi git commit: NIFI-850 removed nifi parent, updated nifi pom, moved all nifi subdirs up one level, fixed readme.
Date Sat, 15 Aug 2015 17:13:36 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java
new file mode 100644
index 0000000..ff8dc50
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+/**
+ *
+ */
+public enum RepositoryRecordType {
+
+    UPDATE, CREATE, DELETE, CONTENTMISSING, SWAP_IN, SWAP_OUT;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
new file mode 100644
index 0000000..53cc44f
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+/**
+ * <p>
+ * A ContentClaim is a reference to a given flow file's content. Multiple flow
+ * files may reference the same content by both having the same content
+ * claim.</p>
+ *
+ * <p>
+ * Must be thread safe</p>
+ *
+ */
+public interface ContentClaim extends Comparable<ContentClaim> {
+
+    /**
+     * @return the unique identifier for this claim
+     */
+    String getId();
+
+    /**
+     * @return the container identifier in which this claim is held
+     */
+    String getContainer();
+
+    /**
+     * @return the section within a given container the claim is held
+     */
+    String getSection();
+
+    /**
+     * @return Indicates whether or not the Claim is loss-tolerant. If so, we will
+     * attempt to keep the content but will not sacrifice a great deal of
+     * performance to do so
+     */
+    boolean isLossTolerant();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
new file mode 100644
index 0000000..bffcec3
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Responsible for managing all ContentClaims that are used in the application
+ */
+public interface ContentClaimManager {
+
+    /**
+     * Creates a new Content Claim with the given id, container, section, and
+     * loss tolerance.
+     *
+     * @param id of claim
+     * @param container of claim
+     * @param section of claim
+     * @param lossTolerant of claim
+     * @return new claim
+     */
+    ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant);
+
+    /**
+     * @param claim to obtain reference count for
+     * @return the number of FlowFiles that hold a claim to a particular piece
+     * of FlowFile content
+     */
+    int getClaimantCount(ContentClaim claim);
+
+    /**
+     * Decreases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count
+     *
+     * @param claim to decrement claimants on
+     * @return new claimaint count
+     */
+    int decrementClaimantCount(ContentClaim claim);
+
+    /**
+     * Increases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count
+     *
+     * @param claim to increment claims on
+     * @return new claimant count
+     */
+    int incrementClaimantCount(ContentClaim claim);
+
+    /**
+     * Increases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count.
+     *
+     * If it is known that the Content Claim whose count is being incremented is
+     * a newly created ContentClaim, this method should be called with a value
+     * of {@code true} as the second argument, as it may allow the manager to
+     * optimize its tasks, knowing that the Content Claim cannot be referenced
+     * by any other component
+     *
+     * @param claim to increment
+     * @param newClaim provides a hint that no other process can have access to this
+     * claim right now
+     * @return new claim count
+     */
+    int incrementClaimantCount(ContentClaim claim, boolean newClaim);
+
+    /**
+     * Indicates that the given ContentClaim can now be destroyed by the
+     * appropriate Content Repository. This should be done only after it is
+     * guaranteed that the FlowFile Repository has been synchronized with its
+     * underlying storage component. This way, we avoid the following sequence
+     * of events:
+     * <ul>
+     * <li>FlowFile Repository is updated to indicate that FlowFile F no longer
+     * depends on ContentClaim C</li>
+     * <li>ContentClaim C is no longer needed and is destroyed</li>
+     * <li>The Operating System crashes or there is a power failure</li>
+     * <li>Upon restart, the FlowFile Repository was not synchronized with its
+     * underlying storage mechanism and as such indicates that FlowFile F needs
+     * ContentClaim C.</li>
+     * <li>Since ContentClaim C has already been destroyed, it is inaccessible,
+     * and FlowFile F's Content is not found, so the FlowFile is removed,
+     * resulting in data loss.</li>
+     * </ul>
+     *
+     * <p>
+     * Using this method of marking the ContentClaim as destructable only when
+     * the FlowFile repository has been synced with the underlying storage
+     * mechanism, we can ensure that on restart, we will not point to this
+     * unneeded claim. As such, it is now safe to destroy the contents.
+     * </p>
+     *
+     * @param claim to mark as now destructable
+     */
+    void markDestructable(ContentClaim claim);
+
+    /**
+     * Drains up to {@code maxElements} Content Claims from the internal queue
+     * of destructable content claims to the given {@code destination} so that
+     * they can be destroyed.
+     *
+     * @param destination to drain to
+     * @param maxElements max items to drain
+     */
+    void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements);
+
+    /**
+     * Drains up to {@code maxElements} Content Claims from the internal queue
+     * of destructable content claims to the given {@code destination} so that
+     * they can be destroyed. If no ContentClaim is ready to be destroyed at
+     * this time, will wait up to the specified amount of time before returning.
+     * If, after the specified amount of time, there is still no ContentClaim
+     * ready to be destroyed, the method will return without having added
+     * anything to the given {@code destination}.
+     *
+     * @param destination to drain to
+     * @param maxElements max items to drain
+     * @param timeout maximum time to wait
+     * @param unit unit of time to wait
+     */
+    void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit);
+
+    /**
+     * Clears the manager's memory of any and all ContentClaims that it knows
+     * about
+     */
+    void purge();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
new file mode 100644
index 0000000..cebb2b4
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status;
+
+/**
+ */
+public class ConnectionStatus implements Cloneable {
+
+    private String id;
+    private String groupId;
+    private String name;
+    private String sourceId;
+    private String sourceName;
+    private String destinationId;
+    private String destinationName;
+    private int inputCount;
+    private long inputBytes;
+    private int queuedCount;
+    private long queuedBytes;
+    private int outputCount;
+    private long outputBytes;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(final String id) {
+        this.id = id;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(final String groupId) {
+        this.groupId = groupId;
+    }
+
+    public int getQueuedCount() {
+        return queuedCount;
+    }
+
+    public void setQueuedCount(final int queuedCount) {
+        this.queuedCount = queuedCount;
+    }
+
+    public long getQueuedBytes() {
+        return queuedBytes;
+    }
+
+    public void setQueuedBytes(final long queuedBytes) {
+        this.queuedBytes = queuedBytes;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getSourceId() {
+        return sourceId;
+    }
+
+    public void setSourceId(String sourceId) {
+        this.sourceId = sourceId;
+    }
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public void setSourceName(String sourceName) {
+        this.sourceName = sourceName;
+    }
+
+    public String getDestinationId() {
+        return destinationId;
+    }
+
+    public void setDestinationId(String destinationId) {
+        this.destinationId = destinationId;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public void setDestinationName(String destinationName) {
+        this.destinationName = destinationName;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+    }
+
+    public int getInputCount() {
+        return inputCount;
+    }
+
+    public void setInputCount(int inputCount) {
+        this.inputCount = inputCount;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+    }
+
+    public int getOutputCount() {
+        return outputCount;
+    }
+
+    public void setOutputCount(int outputCount) {
+        this.outputCount = outputCount;
+    }
+
+    @Override
+    public ConnectionStatus clone() {
+        final ConnectionStatus clonedObj = new ConnectionStatus();
+        clonedObj.groupId = groupId;
+        clonedObj.id = id;
+        clonedObj.inputBytes = inputBytes;
+        clonedObj.inputCount = inputCount;
+        clonedObj.name = name;
+        clonedObj.outputBytes = outputBytes;
+        clonedObj.outputCount = outputCount;
+        clonedObj.queuedBytes = queuedBytes;
+        clonedObj.queuedCount = queuedCount;
+        clonedObj.sourceId = sourceId;
+        clonedObj.sourceName = sourceName;
+        clonedObj.destinationId = destinationId;
+        clonedObj.destinationName = destinationName;
+        return clonedObj;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("ConnectionStatus [id=");
+        builder.append(id);
+        builder.append(", groupId=");
+        builder.append(groupId);
+        builder.append(", name=");
+        builder.append(name);
+        builder.append(", sourceId=");
+        builder.append(sourceId);
+        builder.append(", sourceName=");
+        builder.append(sourceName);
+        builder.append(", destinationId=");
+        builder.append(destinationId);
+        builder.append(", destinationName=");
+        builder.append(destinationName);
+        builder.append(", inputCount=");
+        builder.append(inputCount);
+        builder.append(", inputBytes=");
+        builder.append(inputBytes);
+        builder.append(", queuedCount=");
+        builder.append(queuedCount);
+        builder.append(", queuedBytes=");
+        builder.append(queuedBytes);
+        builder.append(", outputCount=");
+        builder.append(outputCount);
+        builder.append(", outputBytes=");
+        builder.append(outputBytes);
+        builder.append("]");
+        return builder.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/PortStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/PortStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/PortStatus.java
new file mode 100644
index 0000000..0d248cd
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/PortStatus.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status;
+
+/**
+ * The status of a port.
+ */
+public class PortStatus implements Cloneable {
+
+    private String id;
+    private String groupId;
+    private String name;
+    private Integer activeThreadCount;
+    private int inputCount;
+    private long inputBytes;
+    private int outputCount;
+    private long outputBytes;
+    private int flowFilesReceived;
+    private long bytesReceived;
+    private int flowFilesSent;
+    private long bytesSent;
+    private Boolean transmitting;
+    private RunStatus runStatus;
+
+    public Boolean isTransmitting() {
+        return transmitting;
+    }
+
+    public void setTransmitting(Boolean transmitting) {
+        this.transmitting = transmitting;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Integer getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+    public void setActiveThreadCount(Integer activeThreadCount) {
+        this.activeThreadCount = activeThreadCount;
+    }
+
+    public int getInputCount() {
+        return inputCount;
+    }
+
+    public void setInputCount(int inputCount) {
+        this.inputCount = inputCount;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+    }
+
+    public int getOutputCount() {
+        return outputCount;
+    }
+
+    public void setOutputCount(final int outputCount) {
+        this.outputCount = outputCount;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public void setOutputBytes(final long outputBytes) {
+        this.outputBytes = outputBytes;
+    }
+
+    public RunStatus getRunStatus() {
+        return runStatus;
+    }
+
+    public void setRunStatus(RunStatus runStatus) {
+        this.runStatus = runStatus;
+    }
+
+    public int getFlowFilesReceived() {
+        return flowFilesReceived;
+    }
+
+    public void setFlowFilesReceived(int flowFilesReceived) {
+        this.flowFilesReceived = flowFilesReceived;
+    }
+
+    public long getBytesReceived() {
+        return bytesReceived;
+    }
+
+    public void setBytesReceived(long bytesReceived) {
+        this.bytesReceived = bytesReceived;
+    }
+
+    public int getFlowFilesSent() {
+        return flowFilesSent;
+    }
+
+    public void setFlowFilesSent(int flowFilesSent) {
+        this.flowFilesSent = flowFilesSent;
+    }
+
+    public long getBytesSent() {
+        return bytesSent;
+    }
+
+    public void setBytesSent(long bytesSent) {
+        this.bytesSent = bytesSent;
+    }
+
+    public Boolean getTransmitting() {
+        return transmitting;
+    }
+
+    @Override
+    public PortStatus clone() {
+        final PortStatus clonedObj = new PortStatus();
+        clonedObj.id = id;
+        clonedObj.groupId = groupId;
+        clonedObj.name = name;
+        clonedObj.activeThreadCount = activeThreadCount;
+        clonedObj.inputBytes = inputBytes;
+        clonedObj.inputCount = inputCount;
+        clonedObj.outputBytes = outputBytes;
+        clonedObj.outputCount = outputCount;
+        clonedObj.flowFilesReceived = flowFilesReceived;
+        clonedObj.bytesReceived = bytesReceived;
+        clonedObj.flowFilesSent = flowFilesSent;
+        clonedObj.bytesSent = bytesSent;
+        clonedObj.transmitting = transmitting;
+        clonedObj.runStatus = runStatus;
+        return clonedObj;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("PortStatus [id=");
+        builder.append(id);
+        builder.append(", groupId=");
+        builder.append(groupId);
+        builder.append(", name=");
+        builder.append(name);
+        builder.append(", activeThreadCount=");
+        builder.append(activeThreadCount);
+        builder.append(", transmitting=");
+        builder.append(transmitting);
+        builder.append(", inputCount=");
+        builder.append(inputCount);
+        builder.append(", inputBytes=");
+        builder.append(inputBytes);
+        builder.append(", outputCount=");
+        builder.append(outputCount);
+        builder.append(", outputBytes=");
+        builder.append(outputBytes);
+        builder.append(", runStatus=");
+        builder.append(runStatus);
+        builder.append("]");
+        return builder.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
new file mode 100644
index 0000000..eb0339f
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class ProcessGroupStatus implements Cloneable {
+
+    private String id;
+    private String name;
+    private Integer inputCount;
+    private Long inputContentSize;
+    private Integer outputCount;
+    private Long outputContentSize;
+    private long creationTimestamp;
+    private Integer activeThreadCount;
+    private Integer queuedCount;
+    private Long queuedContentSize;
+    private Long bytesRead;
+    private Long bytesWritten;
+    private int flowFilesReceived;
+    private long bytesReceived;
+    private int flowFilesSent;
+    private long bytesSent;
+    private int flowFilesTransferred;
+    private long bytesTransferred;
+
+    private Collection<ConnectionStatus> connectionStatus = new ArrayList<>();
+    private Collection<ProcessorStatus> processorStatus = new ArrayList<>();
+    private Collection<ProcessGroupStatus> processGroupStatus = new ArrayList<>();
+    private Collection<RemoteProcessGroupStatus> remoteProcessGroupStatus = new ArrayList<>();
+    private Collection<PortStatus> inputPortStatus = new ArrayList<>();
+    private Collection<PortStatus> outputPortStatus = new ArrayList<>();
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(final String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Integer getInputCount() {
+        return inputCount;
+    }
+
+    public void setInputCount(final Integer inputCount) {
+        this.inputCount = inputCount;
+    }
+
+    public Long getInputContentSize() {
+        return inputContentSize;
+    }
+
+    public void setInputContentSize(final Long inputContentSize) {
+        this.inputContentSize = inputContentSize;
+    }
+
+    public Integer getOutputCount() {
+        return outputCount;
+    }
+
+    public void setOutputCount(final Integer outputCount) {
+        this.outputCount = outputCount;
+    }
+
+    public Long getOutputContentSize() {
+        return outputContentSize;
+    }
+
+    public void setOutputContentSize(final Long outputContentSize) {
+        this.outputContentSize = outputContentSize;
+    }
+
+    public Long getBytesRead() {
+        return bytesRead;
+    }
+
+    public void setBytesRead(final Long bytesRead) {
+        this.bytesRead = bytesRead;
+    }
+
+    public Long getBytesWritten() {
+        return bytesWritten;
+    }
+
+    public void setBytesWritten(final Long bytesWritten) {
+        this.bytesWritten = bytesWritten;
+    }
+
+    public Integer getQueuedCount() {
+        return this.queuedCount;
+    }
+
+    public void setQueuedCount(final Integer queuedCount) {
+        this.queuedCount = queuedCount;
+    }
+
+    public Long getQueuedContentSize() {
+        return queuedContentSize;
+    }
+
+    public void setQueuedContentSize(final Long queuedContentSize) {
+        this.queuedContentSize = queuedContentSize;
+    }
+
+    public long getCreationTimestamp() {
+        return creationTimestamp;
+    }
+
+    public void setCreationTimestamp(final long creationTimestamp) {
+        this.creationTimestamp = creationTimestamp;
+    }
+
+    public Integer getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+    public void setActiveThreadCount(final Integer activeThreadCount) {
+        this.activeThreadCount = activeThreadCount;
+    }
+
+    public Collection<ConnectionStatus> getConnectionStatus() {
+        return connectionStatus;
+    }
+
+    public void setConnectionStatus(final Collection<ConnectionStatus> connectionStatus) {
+        this.connectionStatus = connectionStatus;
+    }
+
+    public Collection<ProcessorStatus> getProcessorStatus() {
+        return processorStatus;
+    }
+
+    public void setProcessorStatus(final Collection<ProcessorStatus> processorStatus) {
+        this.processorStatus = processorStatus;
+    }
+
+    public Collection<ProcessGroupStatus> getProcessGroupStatus() {
+        return processGroupStatus;
+    }
+
+    public void setProcessGroupStatus(final Collection<ProcessGroupStatus> processGroupStatus) {
+        this.processGroupStatus = processGroupStatus;
+    }
+
+    public Collection<PortStatus> getInputPortStatus() {
+        return inputPortStatus;
+    }
+
+    public void setInputPortStatus(Collection<PortStatus> inputPortStatus) {
+        this.inputPortStatus = inputPortStatus;
+    }
+
+    public Collection<PortStatus> getOutputPortStatus() {
+        return outputPortStatus;
+    }
+
+    public void setOutputPortStatus(Collection<PortStatus> outputPortStatus) {
+        this.outputPortStatus = outputPortStatus;
+    }
+
+    public Collection<RemoteProcessGroupStatus> getRemoteProcessGroupStatus() {
+        return remoteProcessGroupStatus;
+    }
+
+    public void setRemoteProcessGroupStatus(final Collection<RemoteProcessGroupStatus> remoteProcessGroupStatus) {
+        this.remoteProcessGroupStatus = remoteProcessGroupStatus;
+    }
+
+    public int getFlowFilesReceived() {
+        return flowFilesReceived;
+    }
+
+    public void setFlowFilesReceived(final int flowFilesReceived) {
+        this.flowFilesReceived = flowFilesReceived;
+    }
+
+    public long getBytesReceived() {
+        return bytesReceived;
+    }
+
+    public void setBytesReceived(final long bytesReceived) {
+        this.bytesReceived = bytesReceived;
+    }
+
+    public int getFlowFilesSent() {
+        return flowFilesSent;
+    }
+
+    public void setFlowFilesSent(final int flowFilesSent) {
+        this.flowFilesSent = flowFilesSent;
+    }
+
+    public long getBytesSent() {
+        return bytesSent;
+    }
+
+    public void setBytesSent(final long bytesSent) {
+        this.bytesSent = bytesSent;
+    }
+
+    public int getFlowFilesTransferred() {
+        return flowFilesTransferred;
+    }
+
+    public void setFlowFilesTransferred(int flowFilesTransferred) {
+        this.flowFilesTransferred = flowFilesTransferred;
+    }
+
+    public long getBytesTransferred() {
+        return bytesTransferred;
+    }
+
+    public void setBytesTransferred(long bytesTransferred) {
+        this.bytesTransferred = bytesTransferred;
+    }
+
+    @Override
+    public ProcessGroupStatus clone() {
+
+        final ProcessGroupStatus clonedObj = new ProcessGroupStatus();
+
+        clonedObj.creationTimestamp = creationTimestamp;
+        clonedObj.id = id;
+        clonedObj.name = name;
+        clonedObj.outputContentSize = outputContentSize;
+        clonedObj.outputCount = outputCount;
+        clonedObj.inputContentSize = inputContentSize;
+        clonedObj.inputCount = inputCount;
+        clonedObj.activeThreadCount = activeThreadCount;
+        clonedObj.queuedContentSize = queuedContentSize;
+        clonedObj.queuedCount = queuedCount;
+        clonedObj.bytesRead = bytesRead;
+        clonedObj.bytesWritten = bytesWritten;
+        clonedObj.flowFilesReceived = flowFilesReceived;
+        clonedObj.bytesReceived = bytesReceived;
+        clonedObj.flowFilesSent = flowFilesSent;
+        clonedObj.bytesSent = bytesSent;
+        clonedObj.flowFilesTransferred = flowFilesTransferred;
+        clonedObj.bytesTransferred = bytesTransferred;
+
+        if (connectionStatus != null) {
+            final Collection<ConnectionStatus> statusList = new ArrayList<>();
+            clonedObj.setConnectionStatus(statusList);
+            for (final ConnectionStatus status : connectionStatus) {
+                statusList.add(status.clone());
+            }
+        }
+
+        if (processorStatus != null) {
+            final Collection<ProcessorStatus> statusList = new ArrayList<>();
+            clonedObj.setProcessorStatus(statusList);
+            for (final ProcessorStatus status : processorStatus) {
+                statusList.add(status.clone());
+            }
+        }
+
+        if (inputPortStatus != null) {
+            final Collection<PortStatus> statusList = new ArrayList<>();
+            clonedObj.setInputPortStatus(statusList);
+            for (final PortStatus status : inputPortStatus) {
+                statusList.add(status.clone());
+            }
+        }
+
+        if (outputPortStatus != null) {
+            final Collection<PortStatus> statusList = new ArrayList<>();
+            clonedObj.setOutputPortStatus(statusList);
+            for (final PortStatus status : outputPortStatus) {
+                statusList.add(status.clone());
+            }
+        }
+
+        if (processGroupStatus != null) {
+            final Collection<ProcessGroupStatus> statusList = new ArrayList<>();
+            clonedObj.setProcessGroupStatus(statusList);
+            for (final ProcessGroupStatus status : processGroupStatus) {
+                statusList.add(status.clone());
+            }
+        }
+
+        if (remoteProcessGroupStatus != null) {
+            final Collection<RemoteProcessGroupStatus> statusList = new ArrayList<>();
+            clonedObj.setRemoteProcessGroupStatus(statusList);
+            for (final RemoteProcessGroupStatus status : remoteProcessGroupStatus) {
+                statusList.add(status.clone());
+            }
+        }
+
+        return clonedObj;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("ProcessGroupStatus [id=");
+        builder.append(id);
+        builder.append(", inputCount=");
+        builder.append(inputCount);
+        builder.append(", inputBytes=");
+        builder.append(inputContentSize);
+        builder.append(", outputCount=");
+        builder.append(outputCount);
+        builder.append(", outputBytes=");
+        builder.append(outputContentSize);
+        builder.append(", creationTimestamp=");
+        builder.append(creationTimestamp);
+        builder.append(", activeThreadCount=");
+        builder.append(activeThreadCount);
+        builder.append(", flowFilesTransferred=");
+        builder.append(flowFilesTransferred);
+        builder.append(", bytesTransferred=");
+        builder.append(bytesTransferred);
+        builder.append(", flowFilesReceived=");
+        builder.append(flowFilesReceived);
+        builder.append(", bytesReceived=");
+        builder.append(bytesReceived);
+        builder.append(", flowFilesSent=");
+        builder.append(flowFilesSent);
+        builder.append(", bytesSent=");
+        builder.append(bytesSent);
+        builder.append(",\n\tconnectionStatus=");
+
+        for (final ConnectionStatus status : connectionStatus) {
+            builder.append("\n\t\t");
+            builder.append(status);
+        }
+
+        builder.append(",\n\tprocessorStatus=");
+
+        for (final ProcessorStatus status : processorStatus) {
+            builder.append("\n\t\t");
+            builder.append(status);
+        }
+
+        builder.append(",\n\tprocessGroupStatus=");
+
+        for (final ProcessGroupStatus status : processGroupStatus) {
+            builder.append("\n\t\t");
+            builder.append(status);
+        }
+
+        builder.append(",\n\tremoteProcessGroupStatus=");
+        for (final RemoteProcessGroupStatus status : remoteProcessGroupStatus) {
+            builder.append("\n\t\t");
+            builder.append(status);
+        }
+
+        builder.append(",\n\tinputPortStatus=");
+        for (final PortStatus status : inputPortStatus) {
+            builder.append("\n\t\t");
+            builder.append(status);
+        }
+
+        builder.append(",\n\toutputPortStatus=");
+        for (final PortStatus status : outputPortStatus) {
+            builder.append("\n\t\t");
+            builder.append(status);
+        }
+
+        builder.append("]");
+        return builder.toString();
+    }
+
+    public static void merge(final ProcessGroupStatus target, final ProcessGroupStatus toMerge) {
+        if (target == null || toMerge == null) {
+            return;
+        }
+
+        target.setInputCount(target.getInputCount() + toMerge.getInputCount());
+        target.setInputContentSize(target.getInputContentSize() + toMerge.getInputContentSize());
+        target.setOutputCount(target.getOutputCount() + toMerge.getOutputCount());
+        target.setOutputContentSize(target.getOutputContentSize() + toMerge.getOutputContentSize());
+        target.setQueuedCount(target.getQueuedCount() + toMerge.getQueuedCount());
+        target.setQueuedContentSize(target.getQueuedContentSize() + toMerge.getQueuedContentSize());
+        target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
+        target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
+        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
+        target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());
+        target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
+        target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
+        target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
+        target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
+        target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
+
+        // connection status
+        // sort by id
+        final Map<String, ConnectionStatus> mergedConnectionMap = new HashMap<>();
+        for (final ConnectionStatus status : target.getConnectionStatus()) {
+            mergedConnectionMap.put(status.getId(), status);
+        }
+
+        for (final ConnectionStatus statusToMerge : toMerge.getConnectionStatus()) {
+            ConnectionStatus merged = mergedConnectionMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merged.setQueuedCount(merged.getQueuedCount() + statusToMerge.getQueuedCount());
+            merged.setQueuedBytes(merged.getQueuedBytes() + statusToMerge.getQueuedBytes());
+            merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
+            merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
+            merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
+            merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
+        }
+        target.setConnectionStatus(mergedConnectionMap.values());
+
+        // processor status
+        final Map<String, ProcessorStatus> mergedProcessorMap = new HashMap<>();
+        for (final ProcessorStatus status : target.getProcessorStatus()) {
+            mergedProcessorMap.put(status.getId(), status);
+        }
+
+        for (final ProcessorStatus statusToMerge : toMerge.getProcessorStatus()) {
+            ProcessorStatus merged = mergedProcessorMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
+            merged.setBytesRead(merged.getBytesRead() + statusToMerge.getBytesRead());
+            merged.setBytesWritten(merged.getBytesWritten() + statusToMerge.getBytesWritten());
+            merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
+            merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
+            merged.setInvocations(merged.getInvocations() + statusToMerge.getInvocations());
+            merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
+            merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
+            merged.setProcessingNanos(merged.getProcessingNanos() + statusToMerge.getProcessingNanos());
+
+            // if the status to merge is invalid allow it to take precedence. whether the
+            // processor run status is disabled/stopped/running is part of the flow configuration
+            // and should not differ amongst nodes. however, whether a processor is invalid
+            // can be driven by environmental conditions. this check allows any of those to
+            // take precedence over the configured run status.
+            if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) {
+                merged.setRunStatus(RunStatus.Invalid);
+            }
+        }
+        target.setProcessorStatus(mergedProcessorMap.values());
+
+        // input ports
+        final Map<String, PortStatus> mergedInputPortMap = new HashMap<>();
+        for (final PortStatus status : target.getInputPortStatus()) {
+            mergedInputPortMap.put(status.getId(), status);
+        }
+
+        for (final PortStatus statusToMerge : toMerge.getInputPortStatus()) {
+            PortStatus merged = mergedInputPortMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
+            merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
+            merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
+            merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
+            merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
+            if (statusToMerge.isTransmitting() != null && statusToMerge.isTransmitting()) {
+                merged.setTransmitting(true);
+            }
+
+            // should be unnecessary here since ports run status should not be affected by
+            // environmental conditions but doing so in case that changes
+            if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) {
+                merged.setRunStatus(RunStatus.Invalid);
+            }
+        }
+        target.setInputPortStatus(mergedInputPortMap.values());
+
+        // output ports
+        final Map<String, PortStatus> mergedOutputPortMap = new HashMap<>();
+        for (final PortStatus status : target.getOutputPortStatus()) {
+            mergedOutputPortMap.put(status.getId(), status);
+        }
+
+        for (final PortStatus statusToMerge : toMerge.getOutputPortStatus()) {
+            PortStatus merged = mergedOutputPortMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
+            merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount());
+            merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
+            merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
+            merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
+            if (statusToMerge.isTransmitting() != null && statusToMerge.isTransmitting()) {
+                merged.setTransmitting(true);
+            }
+
+            // should be unnecessary here since ports run status not should be affected by
+            // environmental conditions but doing so in case that changes
+            if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) {
+                merged.setRunStatus(RunStatus.Invalid);
+            }
+        }
+        target.setOutputPortStatus(mergedOutputPortMap.values());
+
+        // child groups
+        final Map<String, ProcessGroupStatus> mergedGroupMap = new HashMap<>();
+        for (final ProcessGroupStatus status : target.getProcessGroupStatus()) {
+            mergedGroupMap.put(status.getId(), status);
+        }
+
+        for (final ProcessGroupStatus statusToMerge : toMerge.getProcessGroupStatus()) {
+            ProcessGroupStatus merged = mergedGroupMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            merge(merged, statusToMerge);
+        }
+        target.setOutputPortStatus(mergedOutputPortMap.values());
+
+        // remote groups
+        final Map<String, RemoteProcessGroupStatus> mergedRemoteGroupMap = new HashMap<>();
+        for (final RemoteProcessGroupStatus status : target.getRemoteProcessGroupStatus()) {
+            mergedRemoteGroupMap.put(status.getId(), status);
+        }
+
+        for (final RemoteProcessGroupStatus statusToMerge : toMerge.getRemoteProcessGroupStatus()) {
+            RemoteProcessGroupStatus merged = mergedRemoteGroupMap.get(statusToMerge.getId());
+            if (merged == null) {
+                mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
+                continue;
+            }
+
+            // NOTE - active/inactive port counts are not merged since that state is considered part of the flow (like runStatus)
+            merged.setReceivedContentSize(merged.getReceivedContentSize() + statusToMerge.getReceivedContentSize());
+            merged.setReceivedCount(merged.getReceivedCount() + statusToMerge.getReceivedCount());
+            merged.setSentContentSize(merged.getSentContentSize() + statusToMerge.getSentContentSize());
+            merged.setSentCount(merged.getSentCount() + statusToMerge.getSentCount());
+            merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
+
+            List<String> mergedAuthenticationIssues = merged.getAuthorizationIssues();
+            if (mergedAuthenticationIssues == null) {
+                mergedAuthenticationIssues = new ArrayList<>();
+            }
+
+            final List<String> nodeAuthorizationIssues = statusToMerge.getAuthorizationIssues();
+            if (nodeAuthorizationIssues != null && !nodeAuthorizationIssues.isEmpty()) {
+                mergedAuthenticationIssues.addAll(nodeAuthorizationIssues);
+            }
+
+            merged.setAuthorizationIssues(mergedAuthenticationIssues);
+        }
+
+        target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
new file mode 100644
index 0000000..54be7ba
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class ProcessorStatus implements Cloneable {
+
+    private String id;
+    private String groupId;
+    private String name;
+    private String type;
+    private RunStatus runStatus;
+    private int inputCount;
+    private long inputBytes;
+    private int outputCount;
+    private long outputBytes;
+    private long bytesRead;
+    private long bytesWritten;
+    private int invocations;
+    private long processingNanos;
+    private int flowFilesRemoved;
+    private long averageLineageDuration;
+    private int activeThreadCount;
+    private int flowFilesReceived;
+    private long bytesReceived;
+    private int flowFilesSent;
+    private long bytesSent;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(final String id) {
+        this.id = id;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(final String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(final String name) {
+        this.name = name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(final String type) {
+        this.type = type;
+    }
+
+    public int getInputCount() {
+        return inputCount;
+    }
+
+    public RunStatus getRunStatus() {
+        return runStatus;
+    }
+
+    public void setRunStatus(RunStatus runStatus) {
+        this.runStatus = runStatus;
+    }
+
+    public void setInputCount(final int inputCount) {
+        this.inputCount = inputCount;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public void setInputBytes(final long inputBytes) {
+        this.inputBytes = inputBytes;
+    }
+
+    public int getOutputCount() {
+        return outputCount;
+    }
+
+    public void setOutputCount(final int outputCount) {
+        this.outputCount = outputCount;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public void setOutputBytes(final long outputBytes) {
+        this.outputBytes = outputBytes;
+    }
+
+    public long getBytesRead() {
+        return bytesRead;
+    }
+
+    public void setBytesRead(final long bytesRead) {
+        this.bytesRead = bytesRead;
+    }
+
+    public long getBytesWritten() {
+        return bytesWritten;
+    }
+
+    public void setBytesWritten(final long bytesWritten) {
+        this.bytesWritten = bytesWritten;
+    }
+
+    public int getInvocations() {
+        return invocations;
+    }
+
+    public void setInvocations(final int invocations) {
+        this.invocations = invocations;
+    }
+
+    public long getProcessingNanos() {
+        return processingNanos;
+    }
+
+    public void setProcessingNanos(final long processingNanos) {
+        this.processingNanos = processingNanos;
+    }
+
+    public long getAverageLineageDuration(final TimeUnit timeUnit) {
+        return TimeUnit.MILLISECONDS.convert(averageLineageDuration, timeUnit);
+    }
+
+    public void setAverageLineageDuration(final long duration, final TimeUnit timeUnit) {
+        this.averageLineageDuration = timeUnit.toMillis(duration);
+    }
+
+    public long getAverageLineageDuration() {
+        return averageLineageDuration;
+    }
+
+    public void setAverageLineageDuration(final long millis) {
+        this.averageLineageDuration = millis;
+    }
+
+    public int getFlowFilesRemoved() {
+        return flowFilesRemoved;
+    }
+
+    public void setFlowFilesRemoved(int flowFilesRemoved) {
+        this.flowFilesRemoved = flowFilesRemoved;
+    }
+
+    public int getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+    public void setActiveThreadCount(final int activeThreadCount) {
+        this.activeThreadCount = activeThreadCount;
+    }
+
+    public int getFlowFilesReceived() {
+        return flowFilesReceived;
+    }
+
+    public void setFlowFilesReceived(int flowFilesReceived) {
+        this.flowFilesReceived = flowFilesReceived;
+    }
+
+    public long getBytesReceived() {
+        return bytesReceived;
+    }
+
+    public void setBytesReceived(long bytesReceived) {
+        this.bytesReceived = bytesReceived;
+    }
+
+    public int getFlowFilesSent() {
+        return flowFilesSent;
+    }
+
+    public void setFlowFilesSent(int flowFilesSent) {
+        this.flowFilesSent = flowFilesSent;
+    }
+
+    public long getBytesSent() {
+        return bytesSent;
+    }
+
+    public void setBytesSent(long bytesSent) {
+        this.bytesSent = bytesSent;
+    }
+
+    @Override
+    public ProcessorStatus clone() {
+        final ProcessorStatus clonedObj = new ProcessorStatus();
+        clonedObj.activeThreadCount = activeThreadCount;
+        clonedObj.bytesRead = bytesRead;
+        clonedObj.bytesWritten = bytesWritten;
+        clonedObj.flowFilesReceived = flowFilesReceived;
+        clonedObj.bytesReceived = bytesReceived;
+        clonedObj.flowFilesSent = flowFilesSent;
+        clonedObj.bytesSent = bytesSent;
+        clonedObj.groupId = groupId;
+        clonedObj.id = id;
+        clonedObj.inputBytes = inputBytes;
+        clonedObj.inputCount = inputCount;
+        clonedObj.invocations = invocations;
+        clonedObj.name = name;
+        clonedObj.outputBytes = outputBytes;
+        clonedObj.outputCount = outputCount;
+        clonedObj.processingNanos = processingNanos;
+        clonedObj.averageLineageDuration = averageLineageDuration;
+        clonedObj.flowFilesRemoved = flowFilesRemoved;
+        clonedObj.runStatus = runStatus;
+        clonedObj.type = type;
+        return clonedObj;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("ProcessorStatus [id=");
+        builder.append(id);
+        builder.append(", groupId=");
+        builder.append(groupId);
+        builder.append(", name=");
+        builder.append(name);
+        builder.append(", type=");
+        builder.append(type);
+        builder.append(", runStatus=");
+        builder.append(runStatus);
+        builder.append(", inputCount=");
+        builder.append(inputCount);
+        builder.append(", inputBytes=");
+        builder.append(inputBytes);
+        builder.append(", outputCount=");
+        builder.append(outputCount);
+        builder.append(", outputBytes=");
+        builder.append(outputBytes);
+        builder.append(", bytesRead=");
+        builder.append(bytesRead);
+        builder.append(", bytesWritten=");
+        builder.append(bytesWritten);
+        builder.append(", invocations=");
+        builder.append(invocations);
+        builder.append(", processingNanos=");
+        builder.append(processingNanos);
+        builder.append(", activeThreadCount=");
+        builder.append(activeThreadCount);
+        builder.append("]");
+        return builder.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/RemoteProcessGroupStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/RemoteProcessGroupStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/RemoteProcessGroupStatus.java
new file mode 100644
index 0000000..110972e
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/RemoteProcessGroupStatus.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class RemoteProcessGroupStatus implements Cloneable {
+
+    private String id;
+    private String groupId;
+    private TransmissionStatus transmissionStatus;
+    private String uri;
+    private String name;
+    private Integer activeThreadCount;
+    private int sentCount;
+    private long sentContentSize;
+    private int receivedCount;
+    private long receivedContentSize;
+    private Integer activeRemotePortCount;
+    private Integer inactiveRemotePortCount;
+
+    private long averageLineageDuration;
+    private List<String> authorizationIssues = new ArrayList<>();
+
+    public String getTargetUri() {
+        return uri;
+    }
+
+    public void setTargetUri(String uri) {
+        this.uri = uri;
+    }
+
+    public TransmissionStatus getTransmissionStatus() {
+        return transmissionStatus;
+    }
+
+    public void setTransmissionStatus(TransmissionStatus transmissionStatus) {
+        this.transmissionStatus = transmissionStatus;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public Integer getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+    public void setActiveThreadCount(Integer activeThreadCount) {
+        this.activeThreadCount = activeThreadCount;
+    }
+
+    public Integer getSentCount() {
+        return sentCount;
+    }
+
+    public void setSentCount(Integer sentCount) {
+        this.sentCount = sentCount;
+    }
+
+    public Long getSentContentSize() {
+        return sentContentSize;
+    }
+
+    public void setSentContentSize(Long sentContentSize) {
+        this.sentContentSize = sentContentSize;
+    }
+
+    public Integer getReceivedCount() {
+        return receivedCount;
+    }
+
+    public void setReceivedCount(Integer receivedCount) {
+        this.receivedCount = receivedCount;
+    }
+
+    public Long getReceivedContentSize() {
+        return receivedContentSize;
+    }
+
+    public void setReceivedContentSize(Long receivedContentSize) {
+        this.receivedContentSize = receivedContentSize;
+    }
+
+    public Integer getActiveRemotePortCount() {
+        return activeRemotePortCount;
+    }
+
+    public void setActiveRemotePortCount(Integer activeRemotePortCount) {
+        this.activeRemotePortCount = activeRemotePortCount;
+    }
+
+    public Integer getInactiveRemotePortCount() {
+        return inactiveRemotePortCount;
+    }
+
+    public void setInactiveRemotePortCount(Integer inactiveRemotePortCount) {
+        this.inactiveRemotePortCount = inactiveRemotePortCount;
+    }
+
+    public List<String> getAuthorizationIssues() {
+        return new ArrayList<>(authorizationIssues);
+    }
+
+    public void setAuthorizationIssues(List<String> authorizationIssues) {
+        this.authorizationIssues = new ArrayList<>(Objects.requireNonNull(authorizationIssues));
+    }
+
+    public long getAverageLineageDuration() {
+        return averageLineageDuration;
+    }
+
+    public void setAverageLineageDuration(final long millis) {
+        this.averageLineageDuration = millis;
+    }
+
+    public long getAverageLineageDuration(final TimeUnit timeUnit) {
+        return TimeUnit.MILLISECONDS.convert(averageLineageDuration, timeUnit);
+    }
+
+    public void setAverageLineageDuration(final long duration, final TimeUnit timeUnit) {
+        this.averageLineageDuration = timeUnit.toMillis(duration);
+    }
+
+    @Override
+    public RemoteProcessGroupStatus clone() {
+        final RemoteProcessGroupStatus clonedObj = new RemoteProcessGroupStatus();
+        clonedObj.id = id;
+        clonedObj.groupId = groupId;
+        clonedObj.name = name;
+        clonedObj.uri = uri;
+        clonedObj.activeThreadCount = activeThreadCount;
+        clonedObj.transmissionStatus = transmissionStatus;
+        clonedObj.sentCount = sentCount;
+        clonedObj.sentContentSize = sentContentSize;
+        clonedObj.receivedCount = receivedCount;
+        clonedObj.receivedContentSize = receivedContentSize;
+        clonedObj.activeRemotePortCount = activeRemotePortCount;
+        clonedObj.inactiveRemotePortCount = inactiveRemotePortCount;
+        clonedObj.averageLineageDuration = averageLineageDuration;
+        clonedObj.authorizationIssues = getAuthorizationIssues();
+        return clonedObj;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("RemoteProcessGroupStatus [id=");
+        builder.append(id);
+        builder.append(", groupId=");
+        builder.append(groupId);
+        builder.append(", name=");
+        builder.append(name);
+        builder.append(", uri=");
+        builder.append(uri);
+        builder.append(", activeThreadCount=");
+        builder.append(activeThreadCount);
+        builder.append(", transmissionStatus=");
+        builder.append(transmissionStatus);
+        builder.append(", sentCount=");
+        builder.append(sentCount);
+        builder.append(", sentContentSize=");
+        builder.append(sentContentSize);
+        builder.append(", receivedCount=");
+        builder.append(receivedCount);
+        builder.append(", receivedContentSize=");
+        builder.append(receivedContentSize);
+        builder.append(", activeRemotePortCount=");
+        builder.append(activeRemotePortCount);
+        builder.append(", inactiveRemotePortCount=");
+        builder.append(inactiveRemotePortCount);
+        builder.append(", authenticationIssues=");
+        builder.append(authorizationIssues);
+        builder.append("]");
+        return builder.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/RunStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/RunStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/RunStatus.java
new file mode 100644
index 0000000..1b7c43d
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/RunStatus.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status;
+
+/**
+ *
+ */
+public enum RunStatus {
+
+    Running,
+    Stopped,
+    Invalid,
+    Disabled;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/TransmissionStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/TransmissionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/TransmissionStatus.java
new file mode 100644
index 0000000..6d7eb12
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/TransmissionStatus.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status;
+
+public enum TransmissionStatus {
+
+    Transmitting,
+    NotTransmitting;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
new file mode 100644
index 0000000..4628a28
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+
+/**
+ * A repository for storing and retrieving components' historical status
+ * information
+ */
+public interface ComponentStatusRepository {
+
+    /**
+     * Captures the status information provided in the given report
+     *
+     * @param rootGroupStatus status of root group
+     */
+    void capture(ProcessGroupStatus rootGroupStatus);
+
+    /**
+     * Captures the status information provided in the given report, providing a
+     * timestamp that indicates the time at which the status report was
+     * generated. This can be used to replay historical values.
+     *
+     * @param rootGroupStatus status
+     * @param timestamp timestamp of capture
+     */
+    void capture(ProcessGroupStatus rootGroupStatus, Date timestamp);
+
+    /**
+     * @return the Date at which the latest capture was performed
+     */
+    Date getLastCaptureDate();
+
+    /**
+     * @param connectionId the ID of the Connection for which the Status is
+     * desired
+     * @param start the earliest date for which status information should be
+     * returned; if <code>null</code>, the start date should be assumed to be
+     * the beginning of time
+     * @param end the latest date for which status information should be
+     * returned; if <code>null</code>, the end date should be assumed to be the
+     * current time
+     * @param preferredDataPoints the preferred number of data points to return.
+     * If the date range is large, the total number of data points could be far
+     * too many to process. Therefore, this parameter allows the requestor to
+     * indicate how many samples to return.
+     * @return a {@link StatusHistory} that provides the status information
+     * about the Connection with the given ID during the given time period
+     */
+    StatusHistory getConnectionStatusHistory(String connectionId, Date start, Date end, int preferredDataPoints);
+
+    /**
+     * @param processGroupId of group to get status of
+     * @param start the earliest date for which status information should be
+     * returned; if <code>null</code>, the start date should be assumed to be
+     * the beginning of time
+     * @param end the latest date for which status information should be
+     * returned; if <code>null</code>, the end date should be assumed to be the
+     * current time
+     * @param preferredDataPoints the preferred number of data points to return.
+     * If the date range is large, the total number of data points could be far
+     * too many to process. Therefore, this parameter allows the requestor to
+     * indicate how many samples to return.
+     * @return a {@link StatusHistory} that provides the status information
+     * about the Process Group with the given ID during the given time period
+     */
+    StatusHistory getProcessGroupStatusHistory(String processGroupId, Date start, Date end, int preferredDataPoints);
+
+    /**
+     * @param processorId to get status of
+     * @param start the earliest date for which status information should be
+     * returned; if <code>null</code>, the start date should be assumed to be
+     * the beginning of time
+     * @param end the latest date for which status information should be
+     * returned; if <code>null</code>, the end date should be assumed to be the
+     * current time
+     * @param preferredDataPoints the preferred number of data points to return.
+     * If the date range is large, the total number of data points could be far
+     * too many to process. Therefore, this parameter allows the requestor to
+     * indicate how many samples to return.
+     * @return a {@link StatusHistory} that provides the status information
+     * about the Processor with the given ID during the given time period
+     */
+    StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints);
+
+    /**
+     * @param remoteGroupId to get history of
+     * @param start the earliest date for which status information should be
+     * returned; if <code>null</code>, the start date should be assumed to be
+     * the beginning of time
+     * @param end the latest date for which status information should be
+     * returned; if <code>null</code>, the end date should be assumed to be the
+     * current time
+     * @param preferredDataPoints the preferred number of data points to return.
+     * If the date range is large, the total number of data points could be far
+     * too many to process. Therefore, this parameter allows the requestor to
+     * indicate how many samples to return.
+     * @return a {@link StatusHistory} that provides the status information
+     * about the Remote Process Group with the given ID during the given time
+     * period
+     */
+    StatusHistory getRemoteProcessGroupStatusHistory(String remoteGroupId, Date start, Date end, int preferredDataPoints);
+
+    /**
+     * @return a List of all {@link MetricDescriptor}s that are applicable to
+     * Process Groups
+     */
+    List<MetricDescriptor<ProcessGroupStatus>> getProcessGroupMetricDescriptors();
+
+    /**
+     * @return a List of all {@link MetricDescriptor}s that are applicable to
+     * Processors
+     */
+    List<MetricDescriptor<ProcessorStatus>> getProcessorMetricDescriptors();
+
+    /**
+     * @return a List of all {@link MetricDescriptor}s that are applicable to
+     * Remote Process Groups
+     */
+    List<MetricDescriptor<RemoteProcessGroupStatus>> getRemoteProcessGroupMetricDescriptors();
+
+    /**
+     * @return a List of all {@link MetricDescriptor}s that are applicable to
+     * Connections
+     */
+    List<MetricDescriptor<ConnectionStatus>> getConnectionMetricDescriptors();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
new file mode 100644
index 0000000..8fdce05
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+/**
+ * Describes a particular metric that is derived from a Status History
+ *
+ * @param <T> type of metric
+ */
+public interface MetricDescriptor<T> {
+
+    public enum Formatter {
+
+        COUNT,
+        DURATION,
+        DATA_SIZE
+    };
+
+    /**
+     * Specifies how the values should be formatted
+     *
+     * @return formatter for values
+     */
+    Formatter getFormatter();
+
+    /**
+     * @return a human-readable description of the field
+     */
+    String getDescription();
+
+    /**
+     * @return a human-readable label for the field
+     */
+    String getLabel();
+
+    /**
+     * @return the name of a field
+     */
+    String getField();
+
+    /**
+     * @return a {@link ValueMapper} that can be used to extract a value for the
+     * status history
+     */
+    ValueMapper<T> getValueFunction();
+
+    /**
+     * @return a {@link ValueReducer} that can reduce multiple StatusSnapshots
+     * into a single Long value
+     */
+    ValueReducer<StatusSnapshot, Long> getValueReducer();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
new file mode 100644
index 0000000..f1bb946
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents a collection of historical status values for a component
+ */
+public interface StatusHistory {
+
+    /**
+     * @return a Date indicating when this report was generated
+     */
+    Date getDateGenerated();
+
+    /**
+     * @return a Map of component field names and their values. The order in
+     * which these values are displayed is dependent on the natural ordering of
+     * the Map returned
+     */
+    Map<String, String> getComponentDetails();
+
+    /**
+     * @return List of snapshots for a given component
+     */
+    List<StatusSnapshot> getStatusSnapshots();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
new file mode 100644
index 0000000..551ceb2
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * A StatusSnapshot represents a Component's status report at some point in time
+ */
+public interface StatusSnapshot {
+
+    /**
+     * @return the point in time for which the status values were obtained
+     */
+    Date getTimestamp();
+
+    /**
+     * @return a Map of MetricDescriptor to value
+     */
+    Map<MetricDescriptor<?>, Long> getStatusMetrics();
+
+    /**
+     * @return a {@link ValueReducer} that is capable of merging multiple
+     * StatusSnapshot objects into a single one
+     */
+    ValueReducer<StatusSnapshot, StatusSnapshot> getValueReducer();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueMapper.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueMapper.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueMapper.java
new file mode 100644
index 0000000..8000b3a
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueMapper.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+public interface ValueMapper<S> {
+
+    Long getValue(S status);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueReducer.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueReducer.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueReducer.java
new file mode 100644
index 0000000..0427da7
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueReducer.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.List;
+
+public interface ValueReducer<T, R> {
+
+    R reduce(List<T> values);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java b/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java
new file mode 100644
index 0000000..d645d60
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.events;
+
+import java.io.Serializable;
+
+import org.apache.nifi.reporting.Severity;
+
+/**
+ * Implementations MUST be thread-safe
+ */
+public interface EventReporter extends Serializable {
+
+    void reportEvent(Severity severity, String category, String message);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/expression/AttributeExpression.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/expression/AttributeExpression.java b/nifi-api/src/main/java/org/apache/nifi/expression/AttributeExpression.java
new file mode 100644
index 0000000..ed409ea
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/expression/AttributeExpression.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.expression;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public interface AttributeExpression {
+
+    /**
+     * @return Evaluates the expression without providing any FlowFile Attributes. This
+     * will evaluate the expression based only on System Properties and JVM
+     * Environment properties
+     * @throws ProcessException if unable to evaluate
+     */
+    String evaluate() throws ProcessException;
+
+    /**
+     * Evaluates the expression without providing any FlowFile Attributes. This
+     * will evaluate the expression based only on System Properties and JVM
+     * Environment properties but allows the values to be decorated
+     *
+     * @param decorator for attribute value
+     * @return evaluated value
+     * @throws ProcessException if failure in evaluation
+     */
+    String evaluate(AttributeValueDecorator decorator) throws ProcessException;
+
+    /**
+     * Evaluates the expression, providing access to the attributes, file size,
+     * id, etc. of the given FlowFile, as well as System Properties and JVM
+     * Environment properties
+     *
+     * @param flowFile to evaluate
+     * @return evaluated value
+     * @throws ProcessException if failure evaluating
+     */
+    String evaluate(FlowFile flowFile) throws ProcessException;
+
+    /**
+     * Evaluates the expression, providing access to the attributes, file size,
+     * id, etc. of the given FlowFile, as well as System Properties and JVM
+     * Environment properties and allows the values to be decorated
+     *
+     * @param flowFile to evaluate
+     * @param decorator for evaluation
+     * @return evaluated value
+     * @throws ProcessException if failed to evaluate
+     */
+    String evaluate(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException;
+
+    /**
+     * @return the type that is returned by the Expression
+     */
+    ResultType getResultType();
+
+    public static enum ResultType {
+
+        STRING, BOOLEAN, NUMBER, DATE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/expression/AttributeValueDecorator.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/expression/AttributeValueDecorator.java b/nifi-api/src/main/java/org/apache/nifi/expression/AttributeValueDecorator.java
new file mode 100644
index 0000000..4cea248
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/expression/AttributeValueDecorator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.expression;
+
+public interface AttributeValueDecorator {
+
+    /**
+     * Decorates the value of a FlowFile Attribute or System/JVM property in
+     * some way
+     *
+     * @param attributeValue to decorate
+     * @return decorated value
+     */
+    String decorate(String attributeValue);
+}


Mime
View raw message