nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject [08/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces and removes use of raw pointers for user facing API.
Date Tue, 28 Mar 2017 17:19:13 GMT
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ListenSyslog.cpp b/libminifi/src/ListenSyslog.cpp
deleted file mode 100644
index f2901e0..0000000
--- a/libminifi/src/ListenSyslog.cpp
+++ /dev/null
@@ -1,343 +0,0 @@
-/**
- * @file ListenSyslog.cpp
- * ListenSyslog class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <queue>
-#include <stdio.h>
-#include <string>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "ListenSyslog.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string ListenSyslog::ProcessorName("ListenSyslog");
-Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B");
-Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB");
-Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2");
-Property ListenSyslog::MaxBatchSize("Max Batch Size",
-		"The maximum number of Syslog events to add to a single FlowFile.", "1");
-Property ListenSyslog::MessageDelimiter("Message Delimiter",
-		"Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n");
-Property ListenSyslog::ParseMessages("Parse Messages",
-		"Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false");
-Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP");
-Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514");
-Relationship ListenSyslog::Success("success", "All files are routed to success");
-Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
-
-void ListenSyslog::initialize()
-{
-	//! Set the supported properties
-	std::set<Property> properties;
-	properties.insert(RecvBufSize);
-	properties.insert(MaxSocketBufSize);
-	properties.insert(MaxConnections);
-	properties.insert(MaxBatchSize);
-	properties.insert(MessageDelimiter);
-	properties.insert(ParseMessages);
-	properties.insert(Protocol);
-	properties.insert(Port);
-	setSupportedProperties(properties);
-	//! Set the supported relationships
-	std::set<Relationship> relationships;
-	relationships.insert(Success);
-	relationships.insert(Invalid);
-	setSupportedRelationships(relationships);
-}
-
-void ListenSyslog::startSocketThread()
-{
-	if (_thread != NULL)
-		return;
-
-	logger_->log_info("ListenSysLog Socket Thread Start");
-	_serverTheadRunning = true;
-	_thread = new std::thread(run, this);
-	_thread->detach();
-}
-
-void ListenSyslog::run(ListenSyslog *process)
-{
-	process->runThread();
-}
-
-void ListenSyslog::runThread()
-{
-	while (_serverTheadRunning)
-	{
-		if (_resetServerSocket)
-		{
-			_resetServerSocket = false;
-			// need to reset the socket
-			std::vector<int>::iterator it;
-			for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
-			{
-				int clientSocket = *it;
-				close(clientSocket);
-			}
-			_clientSockets.clear();
-			if (_serverSocket > 0)
-			{
-				close(_serverSocket);
-				_serverSocket = 0;
-			}
-		}
-
-		if (_serverSocket <= 0)
-		{
-			uint16_t portno = _port;
-			struct sockaddr_in serv_addr;
-			int sockfd;
-			if (_protocol == "TCP")
-				sockfd = socket(AF_INET, SOCK_STREAM, 0);
-			else
-				sockfd = socket(AF_INET, SOCK_DGRAM, 0);
-			if (sockfd < 0)
-			{
-				logger_->log_info("ListenSysLog Server socket creation failed");
-				break;
-			}
-			bzero((char *) &serv_addr, sizeof(serv_addr));
-			serv_addr.sin_family = AF_INET;
-			serv_addr.sin_addr.s_addr = INADDR_ANY;
-			serv_addr.sin_port = htons(portno);
-			if (bind(sockfd, (struct sockaddr *) &serv_addr,
-					sizeof(serv_addr)) < 0)
-			{
-				logger_->log_error("ListenSysLog Server socket bind failed");
-				break;
-			}
-			if (_protocol == "TCP")
-				listen(sockfd,5);
-			_serverSocket = sockfd;
-			logger_->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno);
-		}
-		FD_ZERO(&_readfds);
-		FD_SET(_serverSocket, &_readfds);
-		_maxFds = _serverSocket;
-		std::vector<int>::iterator it;
-		for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
-		{
-			int clientSocket = *it;
-			if (clientSocket >= _maxFds)
-				_maxFds = clientSocket;
-			FD_SET(clientSocket, &_readfds);
-		}
-		fd_set fds;
-		struct timeval tv;
-		int retval;
-		fds = _readfds;
-		tv.tv_sec = 0;
-		// 100 msec
-		tv.tv_usec = 100000;
-		retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
-		if (retval < 0)
-			break;
-		if (retval == 0)
-			continue;
-		if (FD_ISSET(_serverSocket, &fds))
-		{
-			// server socket, either we have UDP datagram or TCP connection request
-			if (_protocol == "TCP")
-			{
-				socklen_t clilen;
-				struct sockaddr_in cli_addr;
-				clilen = sizeof(cli_addr);
-				int newsockfd = accept(_serverSocket,
-						(struct sockaddr *) &cli_addr,
-						&clilen);
-				if (newsockfd > 0)
-				{
-					if (_clientSockets.size() < _maxConnections)
-					{
-						_clientSockets.push_back(newsockfd);
-						logger_->log_info("ListenSysLog new client socket %d connection", newsockfd);
-						continue;
-					}
-					else
-					{
-						close(newsockfd);
-					}
-				}
-			}
-			else
-			{
-				socklen_t clilen;
-				struct sockaddr_in cli_addr;
-				clilen = sizeof(cli_addr);
-				int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0,
-						(struct sockaddr *)&cli_addr, &clilen);
-				if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize)
-				{
-					uint8_t *payload = new uint8_t[recvlen];
-					memcpy(payload, _buffer, recvlen);
-					putEvent(payload, recvlen);
-				}
-			}
-		}
-		it = _clientSockets.begin();
-		while (it != _clientSockets.end())
-		{
-			int clientSocket = *it;
-			if (FD_ISSET(clientSocket, &fds))
-			{
-				int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer));
-				if (recvlen <= 0)
-				{
-					close(clientSocket);
-					logger_->log_info("ListenSysLog client socket %d close", clientSocket);
-					it = _clientSockets.erase(it);
-				}
-				else
-				{
-					if ((recvlen + getEventQueueByteSize()) <= _recvBufSize)
-					{
-						uint8_t *payload = new uint8_t[recvlen];
-						memcpy(payload, _buffer, recvlen);
-						putEvent(payload, recvlen);
-					}
-					++it;
-				}
-			}
-		}
-	}
-	return;
-}
-
-
-int ListenSyslog::readline( int fd, char *bufptr, size_t len )
-{
-	char *bufx = bufptr;
-	static char *bp;
-	static int cnt = 0;
-	static char b[ 2048 ];
-	char c;
-
-	while ( --len > 0 )
-    {
-      if ( --cnt <= 0 )
-      {
-    	  cnt = recv( fd, b, sizeof( b ), 0 );
-    	  if ( cnt < 0 )
-    	  {
-    		  if ( errno == EINTR )
-    		  {
-    			  len++;		/* the while will decrement */
-    			  continue;
-    		  }
-    		  return -1;
-    	  }
-    	  if ( cnt == 0 )
-    		  return 0;
-    	  bp = b;
-      }
-      c = *bp++;
-      *bufptr++ = c;
-      if ( c == '\n' )
-      {
-    	  *bufptr = '\n';
-    	  return bufptr - bufx + 1;
-      }
-    }
-	return -1;
-}
-
-void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-	std::string value;
-	bool needResetServerSocket = false;
-	if (context->getProperty(Protocol.getName(), value))
-	{
-		if (_protocol != value)
-			needResetServerSocket = true;
-		_protocol = value;
-	}
-	if (context->getProperty(RecvBufSize.getName(), value))
-	{
-		Property::StringToInt(value, _recvBufSize);
-	}
-	if (context->getProperty(MaxSocketBufSize.getName(), value))
-	{
-		Property::StringToInt(value, _maxSocketBufSize);
-	}
-	if (context->getProperty(MaxConnections.getName(), value))
-	{
-		Property::StringToInt(value, _maxConnections);
-	}
-	if (context->getProperty(MessageDelimiter.getName(), value))
-	{
-		_messageDelimiter = value;
-	}
-	if (context->getProperty(ParseMessages.getName(), value))
-	{
-		StringUtils::StringToBool(value, _parseMessages);
-	}
-	if (context->getProperty(Port.getName(), value))
-	{
-		int64_t oldPort = _port;
-		Property::StringToInt(value, _port);
-		if (_port != oldPort)
-			needResetServerSocket = true;
-	}
-	if (context->getProperty(MaxBatchSize.getName(), value))
-	{
-		Property::StringToInt(value, _maxBatchSize);
-	}
-
-	if (needResetServerSocket)
-		_resetServerSocket = true;
-
-	startSocketThread();
-
-	// read from the event queue
-	if (isEventQueueEmpty())
-	{
-		context->yield();
-		return;
-	}
-
-	std::queue<SysLogEvent> eventQueue;
-	pollEvent(eventQueue, _maxBatchSize);
-	bool firstEvent = true;
-	FlowFileRecord *flowFile = NULL;
-	while(!eventQueue.empty())
-	{
-		SysLogEvent event = eventQueue.front();
-		eventQueue.pop();
-		if (firstEvent)
-		{
-			flowFile = session->create();
-			if (!flowFile)
-				return;
-			ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
-			session->write(flowFile, &callback);
-			delete[] event.payload;
-			firstEvent = false;
-		}
-		else
-		{
-			ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
-			session->append(flowFile, &callback);
-			delete[] event.payload;
-		}
-	}
-	flowFile->addAttribute("syslog.protocol", _protocol);
-	flowFile->addAttribute("syslog.port", std::to_string(_port));
-	session->transfer(flowFile, Success);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/LogAppenders.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/LogAppenders.cpp b/libminifi/src/LogAppenders.cpp
deleted file mode 100644
index c90588d..0000000
--- a/libminifi/src/LogAppenders.cpp
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "../include/LogAppenders.h"
-
-const char *OutputStreamAppender::nifi_log_output_stream_error_stderr="nifi.log.outputstream.appender.error.stderr";
-
-const char *RollingAppender::nifi_log_rolling_apender_file = "nifi.log.rolling.appender.file";
-const char *RollingAppender::nifi_log_rolling_appender_max_files = "nifi.log.rolling.appender.max.files";
-const char *RollingAppender::nifi_log_rolling_appender_max_file_size = "nifi.log.rolling.appender.max.file_size";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/LogAttribute.cpp b/libminifi/src/LogAttribute.cpp
deleted file mode 100644
index 345eb69..0000000
--- a/libminifi/src/LogAttribute.cpp
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * @file LogAttribute.cpp
- * LogAttribute class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <sstream>
-#include <string.h>
-#include <iostream>
-
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "LogAttribute.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string LogAttribute::ProcessorName("LogAttribute");
-Property LogAttribute::LogLevel("Log Level", "The Log Level to use when logging the Attributes", "info");
-Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", "");
-Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", "");
-Property LogAttribute::LogPayload("Log Payload",
-		"If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.", "false");
-Property LogAttribute::LogPrefix("Log prefix",
-		"Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", "");
-Relationship LogAttribute::Success("success", "success operational on the flow record");
-
-void LogAttribute::initialize()
-{
-	//! Set the supported properties
-	std::set<Property> properties;
-	properties.insert(LogLevel);
-	properties.insert(AttributesToLog);
-	properties.insert(AttributesToIgnore);
-	properties.insert(LogPayload);
-	properties.insert(LogPrefix);
-	setSupportedProperties(properties);
-	//! Set the supported relationships
-	std::set<Relationship> relationships;
-	relationships.insert(Success);
-	setSupportedRelationships(relationships);
-}
-
-void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-	std::string dashLine = "--------------------------------------------------";
-	LogAttrLevel level = LogAttrLevelInfo;
-	bool logPayload = false;
-	std::ostringstream message;
-
-	FlowFileRecord *flow = session->get();
-
-	if (!flow)
-		return;
-
-	std::string value;
-	if (context->getProperty(LogLevel.getName(), value))
-	{
-		logLevelStringToEnum(value, level);
-	}
-	if (context->getProperty(LogPrefix.getName(), value))
-	{
-		dashLine = "-----" + value + "-----";
-	}
-	if (context->getProperty(LogPayload.getName(), value))
-	{
-		StringUtils::StringToBool(value, logPayload);
-	}
-
-	message << "Logging for flow file " << "\n";
-	message << dashLine;
-	message << "\nStandard FlowFile Attributes";
-	message << "\n" << "UUID:" << flow->getUUIDStr();
-	message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
-	message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate());
-	message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset();
-	message << "\nFlowFile Attributes Map Content";
-	std::map<std::string, std::string> attrs = flow->getAttributes();
-    std::map<std::string, std::string>::iterator it;
-    for (it = attrs.begin(); it!= attrs.end(); it++)
-    {
-    	message << "\n" << "key:" << it->first << " value:" << it->second;
-    }
-    message << "\nFlowFile Resource Claim Content";
-    ResourceClaim *claim = flow->getResourceClaim();
-    if (claim)
-    {
-    	message << "\n" << "Content Claim:" << claim->getContentFullPath();
-    }
-    if (logPayload && flow->getSize() <= 1024*1024)
-    {
-    	message << "\n" << "Payload:" << "\n";
-    	ReadCallback callback(flow->getSize());
-    	session->read(flow, &callback);
-    	for (unsigned int i = 0, j = 0; i < callback._readSize; i++)
-    	{
-    		char temp[8];
-    		sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
-    		message << temp;
-    		j++;
-    		if (j == 16)
-    		{
-    			message << '\n';
-    			j = 0;
-    		}
-    	}
-    }
-    message << "\n" << dashLine << std::ends;
-    std::string output = message.str();
-
-    switch (level)
-    {
-    case LogAttrLevelInfo:
-    	logger_->log_info("%s", output.c_str());
-		break;
-    case LogAttrLevelDebug:
-    	logger_->log_debug("%s", output.c_str());
-		break;
-    case LogAttrLevelError:
-    	logger_->log_error("%s", output.c_str());
-		break;
-    case LogAttrLevelTrace:
-    	logger_->log_trace("%s", output.c_str());
-    	break;
-    case LogAttrLevelWarn:
-    	logger_->log_warn("%s", output.c_str());
-    	break;
-    default:
-    	break;
-    }
-
-    // Test Import
-    /*
-    FlowFileRecord *importRecord = session->create();
-    session->import(claim->getContentFullPath(), importRecord);
-    session->transfer(importRecord, Success); */
-
-
-    // Transfer to the relationship
-    session->transfer(flow, Success);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Logger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Logger.cpp b/libminifi/src/Logger.cpp
deleted file mode 100644
index e90667d..0000000
--- a/libminifi/src/Logger.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * @file Logger.cpp
- * Logger class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "../include/Logger.h"
-
-#include <vector>
-#include <queue>
-#include <map>
-
-
-std::shared_ptr<Logger> Logger::singleton_logger_(nullptr);
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
deleted file mode 100644
index 7e3527e..0000000
--- a/libminifi/src/ProcessGroup.cpp
+++ /dev/null
@@ -1,307 +0,0 @@
-/**
- * @file ProcessGroup.cpp
- * ProcessGroup class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-
-#include "ProcessGroup.h"
-#include "Processor.h"
-
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
-		ProcessGroup *parent) :
-		name_(name), type_(type), parent_process_group_(parent) {
-	if (!uuid)
-		// Generate the global UUID for the flow record
-		uuid_generate(uuid_);
-	else
-		uuid_copy(uuid_, uuid);
-
-	yield_period_msec_ = 0;
-	transmitting_ = false;
-
-	logger_ = Logger::getLogger();
-	logger_->log_info("ProcessGroup %s created", name_.c_str());
-}
-
-ProcessGroup::~ProcessGroup() {
-	for (std::set<Connection *>::iterator it = connections_.begin();
-			it != connections_.end(); ++it) {
-		Connection *connection = *it;
-		connection->drain();
-		delete connection;
-	}
-
-	for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin();
-			it != child_process_groups_.end(); ++it) {
-		ProcessGroup *processGroup(*it);
-		delete processGroup;
-	}
-
-	for (std::set<Processor *>::iterator it = processors_.begin();
-			it != processors_.end(); ++it) {
-		Processor *processor(*it);
-		delete processor;
-	}
-}
-
-void ProcessGroup::getConnections(std::map<std::string, Connection*> *connectionMap)
-{
-	for (auto connection : connections_)
-	{
-		(*connectionMap)[connection->getUUIDStr()] = connection;
-	}
-
-	for (auto processGroup: child_process_groups_) {
-		processGroup->getConnections(connectionMap);
-	}
-}
-
-bool ProcessGroup::isRootProcessGroup() {
-	std::lock_guard<std::mutex> lock(mtx_);
-	return (type_ == ROOT_PROCESS_GROUP);
-}
-
-void ProcessGroup::addProcessor(Processor *processor) {
-	std::lock_guard<std::mutex> lock(mtx_);
-
-	if (processors_.find(processor) == processors_.end()) {
-		// We do not have the same processor in this process group yet
-		processors_.insert(processor);
-		logger_->log_info("Add processor %s into process group %s",
-				processor->getName().c_str(), name_.c_str());
-	}
-}
-
-void ProcessGroup::removeProcessor(Processor *processor) {
-	std::lock_guard<std::mutex> lock(mtx_);
-
-	if (processors_.find(processor) != processors_.end()) {
-		// We do have the same processor in this process group yet
-		processors_.erase(processor);
-		logger_->log_info("Remove processor %s from process group %s",
-				processor->getName().c_str(), name_.c_str());
-	}
-}
-
-void ProcessGroup::addProcessGroup(ProcessGroup *child) {
-	std::lock_guard<std::mutex> lock(mtx_);
-
-	if (child_process_groups_.find(child) == child_process_groups_.end()) {
-		// We do not have the same child process group in this process group yet
-		child_process_groups_.insert(child);
-		logger_->log_info("Add child process group %s into process group %s",
-				child->getName().c_str(), name_.c_str());
-	}
-}
-
-void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
-	std::lock_guard<std::mutex> lock(mtx_);
-
-	if (child_process_groups_.find(child) != child_process_groups_.end()) {
-		// We do have the same child process group in this process group yet
-		child_process_groups_.erase(child);
-		logger_->log_info("Remove child process group %s from process group %s",
-				child->getName().c_str(), name_.c_str());
-	}
-}
-
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
-		EventDrivenSchedulingAgent *eventScheduler) {
-	std::lock_guard<std::mutex> lock(mtx_);
-
-	try {
-		// Start all the processor node, input and output ports
-		for (auto processor : processors_) {
-			logger_->log_debug("Starting %s", processor->getName().c_str());
-
-			if (!processor->isRunning()
-					&& processor->getScheduledState() != DISABLED) {
-				if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
-					timeScheduler->schedule(processor);
-				else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
-					eventScheduler->schedule(processor);
-			}
-		}
-		// Start processing the group
-		for (auto processGroup : child_process_groups_) {
-			processGroup->startProcessing(timeScheduler, eventScheduler);
-		}
-	} catch (std::exception &exception) {
-		logger_->log_debug("Caught Exception %s", exception.what());
-		throw;
-	} catch (...) {
-		logger_->log_debug(
-				"Caught Exception during process group start processing");
-		throw;
-	}
-}
-
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
-		EventDrivenSchedulingAgent *eventScheduler) {
-	std::lock_guard<std::mutex> lock(mtx_);
-
-	try {
-		// Stop all the processor node, input and output ports
-		for (std::set<Processor *>::iterator it = processors_.begin();
-				it != processors_.end(); ++it) {
-			Processor *processor(*it);
-			if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
-				timeScheduler->unschedule(processor);
-			else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
-				eventScheduler->unschedule(processor);
-		}
-
-		for (std::set<ProcessGroup *>::iterator it =
-				child_process_groups_.begin(); it != child_process_groups_.end();
-				++it) {
-			ProcessGroup *processGroup(*it);
-			processGroup->stopProcessing(timeScheduler, eventScheduler);
-		}
-	} catch (std::exception &exception) {
-		logger_->log_debug("Caught Exception %s", exception.what());
-		throw;
-	} catch (...) {
-		logger_->log_debug(
-				"Caught Exception during process group stop processing");
-		throw;
-	}
-}
-
-Processor *ProcessGroup::findProcessor(uuid_t uuid) {
-
-	Processor *ret = NULL;
-	// std::lock_guard<std::mutex> lock(_mtx);
-
-	for(auto processor : processors_){
-		logger_->log_info("find processor %s", processor->getName().c_str());
-		uuid_t processorUUID;
-
-		if (processor->getUUID(processorUUID)) {
-
-			char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
-			uuid_unparse_lower(processorUUID, uuid_str);
-			std::string processorUUIDstr = uuid_str;
-			uuid_unparse_lower(uuid, uuid_str);
-			std::string uuidStr = uuid_str;
-			if (processorUUIDstr == uuidStr) {
-				return processor;
-			}
-		}
-
-	}
-	for(auto processGroup : child_process_groups_){
-
-		logger_->log_info("find processor child %s",
-				processGroup->getName().c_str());
-		Processor *processor = processGroup->findProcessor(uuid);
-		if (processor)
-			return processor;
-	}
-
-	return ret;
-}
-
-Processor *ProcessGroup::findProcessor(std::string processorName) {
-	Processor *ret = NULL;
-
-	for(auto processor : processors_){
-		logger_->log_debug("Current processor is %s",
-				processor->getName().c_str());
-		if (processor->getName() == processorName)
-			return processor;
-	}
-
-	for(auto processGroup : child_process_groups_){
-		Processor *processor = processGroup->findProcessor(processorName);
-		if (processor)
-			return processor;
-	}
-
-	return ret;
-}
-
-void ProcessGroup::updatePropertyValue(std::string processorName,
-		std::string propertyName, std::string propertyValue) {
-	std::lock_guard<std::mutex> lock(mtx_);
-
-	for(auto processor : processors_){
-		if (processor->getName() == processorName) {
-			processor->setProperty(propertyName, propertyValue);
-		}
-	}
-
-	for(auto processGroup : child_process_groups_){
-		processGroup->updatePropertyValue(processorName, propertyName,
-				propertyValue);
-	}
-
-	return;
-}
-
-void ProcessGroup::addConnection(Connection *connection) {
-	std::lock_guard<std::mutex> lock(mtx_);
-
-	if (connections_.find(connection) == connections_.end()) {
-		// We do not have the same connection in this process group yet
-		connections_.insert(connection);
-		logger_->log_info("Add connection %s into process group %s",
-				connection->getName().c_str(), name_.c_str());
-		uuid_t sourceUUID;
-		Processor *source = NULL;
-		connection->getSourceProcessorUUID(sourceUUID);
-		source = this->findProcessor(sourceUUID);
-		if (source)
-			source->addConnection(connection);
-		Processor *destination = NULL;
-		uuid_t destinationUUID;
-		connection->getDestinationProcessorUUID(destinationUUID);
-		destination = this->findProcessor(destinationUUID);
-		if (destination && destination != source)
-			destination->addConnection(connection);
-	}
-}
-
-void ProcessGroup::removeConnection(Connection *connection) {
-	std::lock_guard<std::mutex> lock(mtx_);
-
-	if (connections_.find(connection) != connections_.end()) {
-		// We do not have the same connection in this process group yet
-		connections_.erase(connection);
-		logger_->log_info("Remove connection %s into process group %s",
-				connection->getName().c_str(), name_.c_str());
-		uuid_t sourceUUID;
-		Processor *source = NULL;
-		connection->getSourceProcessorUUID(sourceUUID);
-		source = this->findProcessor(sourceUUID);
-		if (source)
-			source->removeConnection(connection);
-		Processor *destination = NULL;
-		uuid_t destinationUUID;
-		connection->getDestinationProcessorUUID(destinationUUID);
-		destination = this->findProcessor(destinationUUID);
-		if (destination && destination != source)
-			destination->removeConnection(connection);
-	}
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp
deleted file mode 100644
index 3b3eb64..0000000
--- a/libminifi/src/ProcessSession.cpp
+++ /dev/null
@@ -1,790 +0,0 @@
-/**
- * @file ProcessSession.cpp
- * ProcessSession class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <iostream>
-
-#include "ProcessSession.h"
-#include "FlowController.h"
-
-ProcessSession::ProcessSession(ProcessContext *processContext) : _processContext(processContext) {
-	logger_ = Logger::getLogger();
-	logger_->log_trace("ProcessSession created for %s", _processContext->getProcessor()->getName().c_str());
-	_provenanceReport = NULL;
-	if (FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable())
-	{
-		_provenanceReport = new ProvenanceReporter(_processContext->getProcessor()->getUUIDStr(),
-					_processContext->getProcessor()->getName());
-	}
-}
-
-FlowFileRecord* ProcessSession::create()
-{
-	std::map<std::string, std::string> empty;
-	FlowFileRecord *record = new FlowFileRecord(empty);
-
-	if (record)
-	{
-		_addedFlowFiles[record->getUUIDStr()] = record;
-		logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
-		std::string details = _processContext->getProcessor()->getName() + " creates flow record " +  record->getUUIDStr();
-		if (_provenanceReport)
-			_provenanceReport->create(record, details);
-	}
-
-	return record;
-}
-
-FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
-{
-	std::map<std::string, std::string> empty;
-	FlowFileRecord *record = new FlowFileRecord(empty);
-
-	if (record)
-	{
-		_addedFlowFiles[record->getUUIDStr()] = record;
-		logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
-	}
-
-	if (record)
-	{
-		// Copy attributes
-		std::map<std::string, std::string> parentAttributes = parent->getAttributes();
-	    std::map<std::string, std::string>::iterator it;
-	    for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
-	    {
-	    	if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
-	    			it->first == FlowAttributeKey(DISCARD_REASON) ||
-					it->first == FlowAttributeKey(UUID))
-	    		// Do not copy special attributes from parent
-	    		continue;
-	    	record->setAttribute(it->first, it->second);
-	    }
-	    record->_lineageStartDate = parent->_lineageStartDate;
-	    record->_lineageIdentifiers = parent->_lineageIdentifiers;
-	    record->_lineageIdentifiers.insert(parent->_uuidStr);
-
-	}
-	return record;
-}
-
-FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
-{
-	FlowFileRecord *record = this->create(parent);
-	if (record)
-	{
-		// Copy Resource Claim
-		record->_claim = parent->_claim;
-		if (record->_claim)
-		{
-			record->_offset = parent->_offset;
-			record->_size = parent->_size;
-			record->_claim->increaseFlowFileRecordOwnedCount();
-		}
-		if (_provenanceReport)
-			_provenanceReport->clone(parent, record);
-	}
-	return record;
-}
-
-FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
-{
-	std::map<std::string, std::string> empty;
-	FlowFileRecord *record = new FlowFileRecord(empty);
-
-	if (record)
-	{
-		this->_clonedFlowFiles[record->getUUIDStr()] = record;
-		logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str());
-		// Copy attributes
-		std::map<std::string, std::string> parentAttributes = parent->getAttributes();
-		std::map<std::string, std::string>::iterator it;
-		for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
-		{
-			if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
-	    			it->first == FlowAttributeKey(DISCARD_REASON) ||
-					it->first == FlowAttributeKey(UUID))
-	    		// Do not copy special attributes from parent
-	    		continue;
-	    	record->setAttribute(it->first, it->second);
-	    }
-	    record->_lineageStartDate = parent->_lineageStartDate;
-	    record->_lineageIdentifiers = parent->_lineageIdentifiers;
-	    record->_lineageIdentifiers.insert(parent->_uuidStr);
-
-	    // Copy Resource Claim
-	    record->_claim = parent->_claim;
-	    if (record->_claim)
-	    {
-	    	record->_offset = parent->_offset;
-	    	record->_size = parent->_size;
-	    	record->_claim->increaseFlowFileRecordOwnedCount();
-	    }
-	    if (_provenanceReport)
-	    	_provenanceReport->clone(parent, record);
-	}
-
-	return record;
-}
-
-FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long size)
-{
-	FlowFileRecord *record = this->create(parent);
-	if (record)
-	{
-		if (parent->_claim)
-		{
-			if ((offset + size) > (long) parent->_size)
-			{
-				// Set offset and size
-				logger_->log_error("clone offset %d and size %d exceed parent size %d",
-						offset, size, parent->_size);
-				// Remove the Add FlowFile for the session
-				std::map<std::string, FlowFileRecord *>::iterator it =
-						this->_addedFlowFiles.find(record->getUUIDStr());
-				if (it != this->_addedFlowFiles.end())
-					this->_addedFlowFiles.erase(record->getUUIDStr());
-				delete record;
-				return NULL;
-			}
-			record->_offset = parent->_offset + parent->_offset;
-			record->_size = size;
-			// Copy Resource Claim
-			record->_claim = parent->_claim;
-			record->_claim->increaseFlowFileRecordOwnedCount();
-		}
-		if (_provenanceReport)
-			_provenanceReport->clone(parent, record);
-	}
-	return record;
-}
-
-void ProcessSession::remove(FlowFileRecord *flow)
-{
-	flow->_markedDelete = true;
-	_deletedFlowFiles[flow->getUUIDStr()] = flow;
-	std::string reason = _processContext->getProcessor()->getName() + " drop flow record " +  flow->getUUIDStr();
-	if (_provenanceReport)
-		_provenanceReport->drop(flow, reason);
-}
-
-void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value)
-{
-	flow->setAttribute(key, value);
-	std::string details = _processContext->getProcessor()->getName() + " modify flow record " +  flow->getUUIDStr() +
-			" attribute " + key + ":" + value;
-	if (_provenanceReport)
-		_provenanceReport->modifyAttributes(flow, details);
-}
-
-void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
-{
-	flow->removeAttribute(key);
-	std::string details = _processContext->getProcessor()->getName() + " remove flow record " +  flow->getUUIDStr() +
-				" attribute " + key;
-	if (_provenanceReport)
-		_provenanceReport->modifyAttributes(flow, details);
-}
-
-void ProcessSession::penalize(FlowFileRecord *flow)
-{
-	flow->_penaltyExpirationMs = getTimeMillis() + this->_processContext->getProcessor()->getPenalizationPeriodMsec();
-}
-
-void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship)
-{
-	_transferRelationship[flow->getUUIDStr()] = relationship;
-}
-
-void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
-{
-	ResourceClaim *claim = NULL;
-
-	claim = new ResourceClaim();
-
-	try
-	{
-		std::ofstream fs;
-		uint64_t startTime = getTimeMillis();
-		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
-		if (fs.is_open())
-		{
-			// Call the callback to write the content
-			callback->process(&fs);
-			if (fs.good() && fs.tellp() >= 0)
-			{
-				flow->_size = fs.tellp();
-				flow->_offset = 0;
-				if (flow->_claim)
-				{
-					// Remove the old claim
-					flow->_claim->decreaseFlowFileRecordOwnedCount();
-					flow->_claim = NULL;
-				}
-				flow->_claim = claim;
-				claim->increaseFlowFileRecordOwnedCount();
-				/*
-				logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
-						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-				fs.close();
-				std::string details = _processContext->getProcessor()->getName() + " modify flow record content " +  flow->getUUIDStr();
-				uint64_t endTime = getTimeMillis();
-				if (_provenanceReport)
-					_provenanceReport->modifyContent(flow, details, endTime - startTime);
-			}
-			else
-			{
-				fs.close();
-				throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
-			}
-		}
-		else
-		{
-			throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
-		}
-	}
-	catch (std::exception &exception)
-	{
-		if (flow && flow->_claim == claim)
-		{
-			flow->_claim->decreaseFlowFileRecordOwnedCount();
-			flow->_claim = NULL;
-		}
-		if (claim)
-			delete claim;
-		logger_->log_debug("Caught Exception %s", exception.what());
-		throw;
-	}
-	catch (...)
-	{
-		if (flow && flow->_claim == claim)
-		{
-			flow->_claim->decreaseFlowFileRecordOwnedCount();
-			flow->_claim = NULL;
-		}
-		if (claim)
-			delete claim;
-		logger_->log_debug("Caught Exception during process session write");
-		throw;
-	}
-}
-
-void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback)
-{
-	ResourceClaim *claim = NULL;
-
-	if (flow->_claim == NULL)
-	{
-		// No existed claim for append, we need to create new claim
-		return write(flow, callback);
-	}
-
-	claim = flow->_claim;
-
-	try
-	{
-		std::ofstream fs;
-		uint64_t startTime = getTimeMillis();
-		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
-		if (fs.is_open())
-		{
-			// Call the callback to write the content
-			std::streampos oldPos = fs.tellp();
-			callback->process(&fs);
-			if (fs.good() && fs.tellp() >= 0)
-			{
-				uint64_t appendSize = fs.tellp() - oldPos;
-				flow->_size += appendSize;
-				/*
-				logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
-						flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-				fs.close();
-				std::string details = _processContext->getProcessor()->getName() + " modify flow record content " +  flow->getUUIDStr();
-				uint64_t endTime = getTimeMillis();
-				if (_provenanceReport)
-					_provenanceReport->modifyContent(flow, details, endTime - startTime);
-			}
-			else
-			{
-				fs.close();
-				throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
-			}
-		}
-		else
-		{
-			throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
-		}
-	}
-	catch (std::exception &exception)
-	{
-		logger_->log_debug("Caught Exception %s", exception.what());
-		throw;
-	}
-	catch (...)
-	{
-		logger_->log_debug("Caught Exception during process session append");
-		throw;
-	}
-}
-
-void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
-{
-	try
-	{
-		ResourceClaim *claim = NULL;
-		if (flow->_claim == NULL)
-		{
-			// No existed claim for read, we throw exception
-			throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
-		}
-
-		claim = flow->_claim;
-		std::ifstream fs;
-		fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
-		if (fs.is_open())
-		{
-			fs.seekg(flow->_offset, fs.beg);
-
-			if (fs.good())
-			{
-				callback->process(&fs);
-				/*
-				logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
-						flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-				fs.close();
-			}
-			else
-			{
-				fs.close();
-				throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
-			}
-		}
-		else
-		{
-			throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
-		}
-	}
-	catch (std::exception &exception)
-	{
-		logger_->log_debug("Caught Exception %s", exception.what());
-		throw;
-	}
-	catch (...)
-	{
-		logger_->log_debug("Caught Exception during process session read");
-		throw;
-	}
-}
-
-void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepSource, uint64_t offset)
-{
-	ResourceClaim *claim = NULL;
-
-	claim = new ResourceClaim();
-	char *buf = NULL;
-	int size = 4096;
-	buf = new char [size];
-
-	try
-	{
-		std::ofstream fs;
-		uint64_t startTime = getTimeMillis();
-		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
-		std::ifstream input;
-		input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-
-		if (fs.is_open() && input.is_open())
-		{
-			// Open the source file and stream to the flow file
-			input.seekg(offset, fs.beg);
-			while (input.good())
-			{
-				input.read(buf, size);
-				if (input)
-					fs.write(buf, size);
-				else
-					fs.write(buf, input.gcount());
-			}
-
-			if (fs.good() && fs.tellp() >= 0)
-			{
-				flow->_size = fs.tellp();
-				flow->_offset = 0;
-				if (flow->_claim)
-				{
-					// Remove the old claim
-					flow->_claim->decreaseFlowFileRecordOwnedCount();
-					flow->_claim = NULL;
-				}
-				flow->_claim = claim;
-				claim->increaseFlowFileRecordOwnedCount();
-
-				logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s",
-						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
-
-				fs.close();
-				input.close();
-				if (!keepSource)
-					std::remove(source.c_str());
-				std::string details = _processContext->getProcessor()->getName() + " modify flow record content " +  flow->getUUIDStr();
-				uint64_t endTime = getTimeMillis();
-				if (_provenanceReport)
-					_provenanceReport->modifyContent(flow, details, endTime - startTime);
-			}
-			else
-			{
-				fs.close();
-				input.close();
-				throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
-			}
-		}
-		else
-		{
-			throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
-		}
-
-		delete[] buf;
-	}
-	catch (std::exception &exception)
-	{
-		if (flow && flow->_claim == claim)
-		{
-			flow->_claim->decreaseFlowFileRecordOwnedCount();
-			flow->_claim = NULL;
-		}
-		if (claim)
-			delete claim;
-		logger_->log_debug("Caught Exception %s", exception.what());
-		delete[] buf;
-		throw;
-	}
-	catch (...)
-	{
-		if (flow && flow->_claim == claim)
-		{
-			flow->_claim->decreaseFlowFileRecordOwnedCount();
-			flow->_claim = NULL;
-		}
-		if (claim)
-			delete claim;
-		logger_->log_debug("Caught Exception during process session write");
-		delete[] buf;
-		throw;
-	}
-}
-
-void ProcessSession::commit()
-{
-	try
-	{
-		// First we clone the flow record based on the transfered relationship for updated flow record
-		for (auto && it : _updatedFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			if (record->_markedDelete)
-				continue;
-			std::map<std::string, Relationship>::iterator itRelationship =
-					this->_transferRelationship.find(record->getUUIDStr());
-			if (itRelationship != _transferRelationship.end())
-			{
-				Relationship relationship = itRelationship->second;
-				// Find the relationship, we need to find the connections for that relationship
-				std::set<Connection *> connections =
-						_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
-				if (connections.empty())
-				{
-					// No connection
-					if (!_processContext->getProcessor()->isAutoTerminated(relationship))
-					{
-						// Not autoterminate, we should have the connect
-						std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
-						throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
-					}
-					else
-					{
-						// Autoterminated
-						remove(record);
-					}
-				}
-				else
-				{
-					// We connections, clone the flow and assign the connection accordingly
-					for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
-					{
-						Connection *connection(*itConnection);
-						if (itConnection == connections.begin())
-						{
-							// First connection which the flow need be routed to
-							record->_connection = connection;
-						}
-						else
-						{
-							// Clone the flow file and route to the connection
-							FlowFileRecord *cloneRecord;
-							cloneRecord = this->cloneDuringTransfer(record);
-							if (cloneRecord)
-								cloneRecord->_connection = connection;
-							else
-								throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
-						}
-					}
-				}
-			}
-			else
-			{
-				// Can not find relationship for the flow
-				throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
-			}
-		}
-		// Do the samething for added flow file
-		for(const auto it : _addedFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			if (record->_markedDelete)
-				continue;
-			std::map<std::string, Relationship>::iterator itRelationship =
-					this->_transferRelationship.find(record->getUUIDStr());
-			if (itRelationship != _transferRelationship.end())
-			{
-				Relationship relationship = itRelationship->second;
-				// Find the relationship, we need to find the connections for that relationship
-				std::set<Connection *> connections =
-						_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
-				if (connections.empty())
-				{
-					// No connection
-					if (!_processContext->getProcessor()->isAutoTerminated(relationship))
-					{
-						// Not autoterminate, we should have the connect
-						std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
-						throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
-					}
-					else
-					{
-						// Autoterminated
-						remove(record);
-					}
-				}
-				else
-				{
-					// We connections, clone the flow and assign the connection accordingly
-					for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
-					{
-						Connection *connection(*itConnection);
-						if (itConnection == connections.begin())
-						{
-							// First connection which the flow need be routed to
-							record->_connection = connection;
-						}
-						else
-						{
-							// Clone the flow file and route to the connection
-							FlowFileRecord *cloneRecord;
-							cloneRecord = this->cloneDuringTransfer(record);
-							if (cloneRecord)
-								cloneRecord->_connection = connection;
-							else
-								throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
-						}
-					}
-				}
-			}
-			else
-			{
-				// Can not find relationship for the flow
-				throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
-			}
-		}
-		// Complete process the added and update flow files for the session, send the flow file to its queue
-		for(const auto &it : _updatedFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			if (record->_markedDelete)
-			{
-				continue;
-			}
-			if (record->_connection)
-				record->_connection->put(record);
-			else
-				delete record;
-		}
-		for(const auto &it : _addedFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			if (record->_markedDelete)
-			{
-				continue;
-			}
-			if (record->_connection)
-				record->_connection->put(record);
-			else
-				delete record;
-		}
-		// Process the clone flow files
-		for(const auto &it : _clonedFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			if (record->_markedDelete)
-			{
-				continue;
-			}
-			if (record->_connection)
-				record->_connection->put(record);
-			else
-				delete record;
-		}
-		// Delete the deleted flow files
-		for(const auto &it : _deletedFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			delete record;
-		}
-		// Delete the snapshot
-		for(const auto &it : _originalFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			delete record;
-		}
-		// All done
-		_updatedFlowFiles.clear();
-		_addedFlowFiles.clear();
-		_clonedFlowFiles.clear();
-		_deletedFlowFiles.clear();
-		_originalFlowFiles.clear();
-		// persistent the provenance report
-		if (this->_provenanceReport)
-			this->_provenanceReport->commit();
-		logger_->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str());
-	}
-	catch (std::exception &exception)
-	{
-		logger_->log_debug("Caught Exception %s", exception.what());
-		throw;
-	}
-	catch (...)
-	{
-		logger_->log_debug("Caught Exception during process session commit");
-		throw;
-	}
-}
-
-
-void ProcessSession::rollback()
-{
-	try
-	{
-		// Requeue the snapshot of the flowfile back
-		for(const auto &it : _originalFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			if (record->_orginalConnection)
-			{
-				record->_snapshot = false;
-				record->_orginalConnection->put(record);
-			}
-			else
-				delete record;
-		}
-		_originalFlowFiles.clear();
-		// Process the clone flow files
-		for(const auto &it : _clonedFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			delete record;
-		}
-		_clonedFlowFiles.clear();
-		for(const auto &it : _addedFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			delete record;
-		}
-		_addedFlowFiles.clear();
-		for(const auto &it : _updatedFlowFiles)
-		{
-			FlowFileRecord *record = it.second;
-			delete record;
-		}
-		_updatedFlowFiles.clear();
-		_deletedFlowFiles.clear();
-		logger_->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str());
-	}
-	catch (std::exception &exception)
-	{
-		logger_->log_debug("Caught Exception %s", exception.what());
-		throw;
-	}
-	catch (...)
-	{
-		logger_->log_debug("Caught Exception during process session roll back");
-		throw;
-	}
-}
-
-FlowFileRecord *ProcessSession::get()
-{
-	Connection *first = _processContext->getProcessor()->getNextIncomingConnection();
-
-	if (first == NULL)
-		return NULL;
-
-	Connection *current = first;
-
-	do
-	{
-		std::set<FlowFileRecord *> expired;
-		FlowFileRecord *ret = current->poll(expired);
-		if (expired.size() > 0)
-		{
-			// Remove expired flow record
-			for (std::set<FlowFileRecord *>::iterator it = expired.begin(); it != expired.end(); ++it)
-			{
-				FlowFileRecord *record = *it;
-				std::string details = _processContext->getProcessor()->getName() + " expire flow record " +  record->getUUIDStr();
-				if (_provenanceReport)
-					_provenanceReport->expire(record, details);
-				delete (record);
-			}
-		}
-		if (ret)
-		{
-			// add the flow record to the current process session update map
-			ret->_markedDelete = false;
-			_updatedFlowFiles[ret->getUUIDStr()] = ret;
-			std::map<std::string, std::string> empty;
-			FlowFileRecord *snapshot = new FlowFileRecord(empty);
-			logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
-			snapshot->duplicate(ret);
-			// save a snapshot
-			_originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
-			return ret;
-		}
-		current = _processContext->getProcessor()->getNextIncomingConnection();
-	}
-	while (current != NULL && current != first);
-
-	return NULL;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ProcessSessionFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSessionFactory.cpp b/libminifi/src/ProcessSessionFactory.cpp
deleted file mode 100644
index a105b1c..0000000
--- a/libminifi/src/ProcessSessionFactory.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * @file ProcessSessionFactory.cpp
- * ProcessSessionFactory class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "ProcessSessionFactory.h"
-
-#include <memory>
-
-std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession()
-{
-	return std::unique_ptr<ProcessSession>(new ProcessSession(_processContext));
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp
deleted file mode 100644
index 94aaa20..0000000
--- a/libminifi/src/Processor.cpp
+++ /dev/null
@@ -1,526 +0,0 @@
-/**
- * @file Processor.cpp
- * Processor class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <memory>
-#include <functional>
-
-#include "Processor.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-#include "ProcessSessionFactory.h"
-
-Processor::Processor(std::string name, uuid_t uuid)
-: _name(name)
-{
-	if (!uuid)
-		// Generate the global UUID for the flow record
-		uuid_generate(_uuid);
-	else
-		uuid_copy(_uuid, uuid);
-
-	char uuidStr[37];
-	uuid_unparse_lower(_uuid, uuidStr);
-	_uuidStr = uuidStr;
-	_hasWork.store(false);
-	// Setup the default values
-	_state = DISABLED;
-	_strategy = TIMER_DRIVEN;
-	_lossTolerant = false;
-	_triggerWhenEmpty = false;
-	_schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS;
-	_runDurantionNano = 0;
-	_yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
-	_penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
-	_maxConcurrentTasks = 1;
-	_activeTasks = 0;
-	_yieldExpiration = 0;
-	_incomingConnectionsIter = this->_incomingConnections.begin();
-	logger_ = Logger::getLogger();
-	logger_->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str());
-}
-
-Processor::~Processor()
-{
-
-}
-
-bool Processor::isRunning()
-{
-	return (_state == RUNNING && _activeTasks > 0);
-}
-
-void Processor::setScheduledState(ScheduledState state)
-{
-	_state = state;
-}
-
-bool Processor::setSupportedProperties(std::set<Property> properties)
-{
-	if (isRunning())
-	{
-		logger_->log_info("Can not set processor property while the process %s is running",
-				_name.c_str());
-		return false;
-	}
-
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	_properties.clear();
-	for (auto item : properties)
-	{
-		_properties[item.getName()] = item;
-		logger_->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str());
-	}
-
-	return true;
-}
-
-bool Processor::setSupportedRelationships(std::set<Relationship> relationships)
-{
-	if (isRunning())
-	{
-		logger_->log_info("Can not set processor supported relationship while the process %s is running",
-				_name.c_str());
-		return false;
-	}
-
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	_relationships.clear();
-	for(auto item : relationships)
-	{
-		_relationships[item.getName()] = item;
-		logger_->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str());
-	}
-
-	return true;
-}
-
-bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationships)
-{
-	if (isRunning())
-	{
-		logger_->log_info("Can not set processor auto terminated relationship while the process %s is running",
-				_name.c_str());
-		return false;
-	}
-
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	_autoTerminatedRelationships.clear();
-	for(auto item : relationships)
-	{
-		_autoTerminatedRelationships[item.getName()] = item;
-		logger_->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str());
-	}
-
-	return true;
-}
-
-bool Processor::isAutoTerminated(Relationship relationship)
-{
-	bool isRun = isRunning();
-		
-	auto conditionalLock = !isRun ? 
-			  std::unique_lock<std::mutex>() 
-			: std::unique_lock<std::mutex>(_mtx);
-
-	const auto &it = _autoTerminatedRelationships.find(relationship.getName());
-	if (it != _autoTerminatedRelationships.end())
-	{
-		return true;
-	}
-	else
-	{
-		return false;
-	}
-}
-
-bool Processor::isSupportedRelationship(Relationship relationship)
-{
-	bool isRun = isRunning();
-
-	auto conditionalLock = !isRun ? 
-			  std::unique_lock<std::mutex>() 
-			: std::unique_lock<std::mutex>(_mtx);
-
-	const auto &it = _relationships.find(relationship.getName());
-	if (it != _relationships.end())
-	{
-		return true;
-	}
-	else
-	{
-		return false;
-	}
-}
-
-bool Processor::getProperty(std::string name, std::string &value)
-{
-	bool isRun = isRunning();
-
-	
-	 auto conditionalLock = !isRun ? 
-                           std::unique_lock<std::mutex>() 
-                         : std::unique_lock<std::mutex>(_mtx);
-			 
-	const auto &it = _properties.find(name);
-	if (it != _properties.end())
-	{
-		Property item = it->second;
-		value = item.getValue();
-		return true;
-	}
-	else
-	{
-		return false;
-	}
-}
-
-bool Processor::setProperty(std::string name, std::string value)
-{
-
-	std::lock_guard<std::mutex> lock(_mtx);
-	auto &&it = _properties.find(name);
-
-	if (it != _properties.end())
-	{
-		Property item = it->second;
-		item.setValue(value);
-		_properties[item.getName()] = item;
-		logger_->log_info("Processor %s property name %s value %s", _name.c_str(), item.getName().c_str(), value.c_str());
-		return true;
-	}
-	else
-	{
-		return false;
-	}
-}
-
-bool Processor::setProperty(Property prop, std::string value) {
-
-	std::lock_guard<std::mutex> lock(_mtx);
-	auto it = _properties.find(
-			prop.getName());
-
-	if (it != _properties.end()) {
-		Property item = it->second;
-		item.setValue(value);
-		_properties[item.getName()] = item;
-		logger_->log_info("Processor %s property name %s value %s",
-				_name.c_str(), item.getName().c_str(), value.c_str());
-		return true;
-	} else {
-		Property newProp(prop);
-		newProp.setValue(value);
-		_properties.insert(
-				std::pair<std::string, Property>(prop.getName(), newProp));
-		return true;
-
-		return false;
-	}
-}
-
-std::set<Connection *> Processor::getOutGoingConnections(std::string relationship)
-{
-	std::set<Connection *> empty;
-
-	auto  &&it = _outGoingConnections.find(relationship);
-	if (it != _outGoingConnections.end())
-	{
-		return _outGoingConnections[relationship];
-	}
-	else
-	{
-		return empty;
-	}
-}
-
-bool Processor::addConnection(Connection *connection)
-{
-	bool ret = false;
-
-
-	if (isRunning())
-	{
-		logger_->log_info("Can not add connection while the process %s is running",
-				_name.c_str());
-		return false;
-	}
-
-
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	uuid_t srcUUID;
-	uuid_t destUUID;
-
-	connection->getSourceProcessorUUID(srcUUID);
-	connection->getDestinationProcessorUUID(destUUID);
-	char uuid_str[37];
-
-
-	uuid_unparse_lower(_uuid, uuid_str);
-	std::string my_uuid = uuid_str;
-	uuid_unparse_lower(destUUID, uuid_str);
-	std::string destination_uuid = uuid_str;
-	if (my_uuid == destination_uuid)
-	{
-		// Connection is destination to the current processor
-		if (_incomingConnections.find(connection) == _incomingConnections.end())
-		{
-			_incomingConnections.insert(connection);
-			connection->setDestinationProcessor(this);
-			logger_->log_info("Add connection %s into Processor %s incoming connection",
-					connection->getName().c_str(), _name.c_str());
-			_incomingConnectionsIter = this->_incomingConnections.begin();
-			ret = true;
-		}
-	}
-	uuid_unparse_lower(srcUUID, uuid_str);
-	std::string source_uuid = uuid_str;
-	if (my_uuid == source_uuid)
-	{
-		std::string relationship = connection->getRelationship().getName();
-		// Connection is source from the current processor
-		auto &&it =
-				_outGoingConnections.find(relationship);
-		if (it != _outGoingConnections.end())
-		{
-			// We already has connection for this relationship
-			std::set<Connection *> existedConnection = it->second;
-			if (existedConnection.find(connection) == existedConnection.end())
-			{
-				// We do not have the same connection for this relationship yet
-				existedConnection.insert(connection);
-				connection->setSourceProcessor(this);
-				_outGoingConnections[relationship] = existedConnection;
-				logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
-												connection->getName().c_str(), _name.c_str(), relationship.c_str());
-				ret = true;
-			}
-		}
-		else
-		{
-
-			// We do not have any outgoing connection for this relationship yet
-			std::set<Connection *> newConnection;
-			newConnection.insert(connection);
-			connection->setSourceProcessor(this);
-			_outGoingConnections[relationship] = newConnection;
-			logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
-								connection->getName().c_str(), _name.c_str(), relationship.c_str());
-			ret = true;
-		}
-	}
-	
-
-	return ret;
-}
-
-void Processor::removeConnection(Connection *connection)
-{
-	if (isRunning())
-	{
-		logger_->log_info("Can not remove connection while the process %s is running",
-				_name.c_str());
-		return;
-	}
-
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	uuid_t srcUUID;
-	uuid_t destUUID;
-
-	connection->getSourceProcessorUUID(srcUUID);
-	connection->getDestinationProcessorUUID(destUUID);
-
-	if (uuid_compare(_uuid, destUUID) == 0)
-	{
-		// Connection is destination to the current processor
-		if (_incomingConnections.find(connection) != _incomingConnections.end())
-		{
-			_incomingConnections.erase(connection);
-			connection->setDestinationProcessor(NULL);
-			logger_->log_info("Remove connection %s into Processor %s incoming connection",
-					connection->getName().c_str(), _name.c_str());
-			_incomingConnectionsIter = this->_incomingConnections.begin();
-		}
-	}
-
-	if (uuid_compare(_uuid, srcUUID) == 0)
-	{
-		std::string relationship = connection->getRelationship().getName();
-		// Connection is source from the current processor
-		auto &&it =
-				_outGoingConnections.find(relationship);
-		if (it == _outGoingConnections.end())
-		{
-			return;
-		}
-		else
-		{
-			if (_outGoingConnections[relationship].find(connection) != _outGoingConnections[relationship].end())
-			{
-				_outGoingConnections[relationship].erase(connection);
-				connection->setSourceProcessor(NULL);
-				logger_->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s",
-								connection->getName().c_str(), _name.c_str(), relationship.c_str());
-			}
-		}
-	}
-}
-
-Connection *Processor::getNextIncomingConnection()
-{
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	if (_incomingConnections.size() == 0)
-		return NULL;
-
-	if (_incomingConnectionsIter == _incomingConnections.end())
-		_incomingConnectionsIter = _incomingConnections.begin();
-
-	Connection *ret = *_incomingConnectionsIter;
-	_incomingConnectionsIter++;
-
-	if (_incomingConnectionsIter == _incomingConnections.end())
-		_incomingConnectionsIter = _incomingConnections.begin();
-
-	return ret;
-}
-
-bool Processor::flowFilesQueued()
-{
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	if (_incomingConnections.size() == 0)
-		return false;
-
-	for(auto &&connection : _incomingConnections)
-	{
-		if (connection->getQueueSize() > 0)
-			return true;
-	}
-
-	return false;
-}
-
-bool Processor::flowFilesOutGoingFull()
-{
-	std::lock_guard<std::mutex> lock(_mtx);
-
-	 for(auto &&connection : _outGoingConnections)
-	{
-		// We already has connection for this relationship
-		std::set<Connection *> existedConnection = connection.second;
-		for(const auto connection : existedConnection)
-		{
-			if (connection->isFull())
-				return true;
-		}
-	}
-
-	return false;
-}
-
-void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory)
-{
-	auto session = sessionFactory->createSession();
-
-	try {
-		// Call the virtual trigger function
-		onTrigger(context, session.get());
-		session->commit();
-	}
-	catch (std::exception &exception)
-	{
-		logger_->log_debug("Caught Exception %s", exception.what());
-		session->rollback();
-		throw;
-	}
-	catch (...)
-	{
-		logger_->log_debug("Caught Exception Processor::onTrigger");
-		session->rollback();
-		throw;
-	}
-}
-
-void Processor::waitForWork(uint64_t timeoutMs)
-{
-	_hasWork.store( isWorkAvailable() );
-
-	if (!_hasWork.load())
-	{
-	    std::unique_lock<std::mutex> lock(_workAvailableMtx);
-	    _hasWorkCondition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return _hasWork.load(); });
-	}
-
-}
-
-void Processor::notifyWork()
-{
-	// Do nothing if we are not event-driven
-	if (_strategy != EVENT_DRIVEN)
-	{
-		return;
-	}
-
-	{
-		_hasWork.store( isWorkAvailable() );
-
-
-		if (_hasWork.load())
-		{
-		      _hasWorkCondition.notify_one();
-		}
-	}
-}
-
-bool Processor::isWorkAvailable()
-{
-	// We have work if any incoming connection has work
-	bool hasWork = false;
-
-	try
-	{
-		for (const auto &conn : getIncomingConnections())
-		{
-			if (conn->getQueueSize() > 0)
-			{
-				hasWork = true;
-				break;
-			}
-		}
-	}
-	catch (...)
-	{
-		logger_->log_error("Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!");
-	}
-
-	return hasWork;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Provenance.cpp b/libminifi/src/Provenance.cpp
deleted file mode 100644
index 58cf730..0000000
--- a/libminifi/src/Provenance.cpp
+++ /dev/null
@@ -1,566 +0,0 @@
-/**
- * @file Provenance.cpp
- * Provenance implemenatation 
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <cstdint>
-#include <vector>
-#include <arpa/inet.h>
-#include "io/DataStream.h"
-#include "io/Serializable.h"
-#include "Provenance.h"
-#include "Relationship.h"
-#include "Logger.h"
-#include "FlowController.h"
-
-//! DeSerialize
-bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo,
-		std::string key) {
-	std::string value;
-	bool ret;
-
-	ret = repo->Get(key, value);
-
-	if (!ret) {
-		logger_->log_error("NiFi Provenance Store event %s can not found",
-				key.c_str());
-		return false;
-	} else
-		logger_->log_debug("NiFi Provenance Read event %s length %d",
-				key.c_str(), value.length());
-
-
-	DataStream stream((const uint8_t*)value.data(),value.length());
-
-	ret = DeSerialize(stream);
-
-	if (ret) {
-		logger_->log_debug(
-				"NiFi Provenance retrieve event %s size %d eventType %d success",
-				_eventIdStr.c_str(), stream.getSize(), _eventType);
-	} else {
-		logger_->log_debug(
-				"NiFi Provenance retrieve event %s size %d eventType %d fail",
-				_eventIdStr.c_str(), stream.getSize(), _eventType);
-	}
-
-	return ret;
-}
-
-bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) {
-
-	DataStream outStream;
-
-	int ret;
-
-	ret = writeUTF(this->_eventIdStr,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	uint32_t eventType = this->_eventType;
-	ret = write(eventType,&outStream);
-	if (ret != 4) {
-
-		return false;
-	}
-
-	ret = write(this->_eventTime,&outStream);
-	if (ret != 8) {
-
-		return false;
-	}
-
-	ret = write(this->_entryDate,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = write(this->_eventDuration,&outStream);
-	if (ret != 8) {
-
-		return false;
-	}
-
-	ret = write(this->_lineageStartDate,&outStream);
-	if (ret != 8) {
-
-		return false;
-	}
-
-	ret = writeUTF(this->_componentId,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	ret = writeUTF(this->_componentType,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	ret = writeUTF(this->_uuid,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	ret = writeUTF(this->_details,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	// write flow attributes
-	uint32_t numAttributes = this->_attributes.size();
-	ret = write(numAttributes,&outStream);
-	if (ret != 4) {
-
-		return false;
-	}
-
-	for (auto itAttribute : _attributes) {
-		ret = writeUTF(itAttribute.first,&outStream, true);
-		if (ret <= 0) {
-
-			return false;
-		}
-		ret = writeUTF(itAttribute.second,&outStream, true);
-		if (ret <= 0) {
-
-			return false;
-		}
-	}
-
-	ret = writeUTF(this->_contentFullPath,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	ret = write(this->_size,&outStream);
-	if (ret != 8) {
-
-		return false;
-	}
-
-	ret = write(this->_offset,&outStream);
-	if (ret != 8) {
-
-		return false;
-	}
-
-	ret = writeUTF(this->_sourceQueueIdentifier,&outStream);
-	if (ret <= 0) {
-
-		return false;
-	}
-
-	if (this->_eventType == ProvenanceEventRecord::FORK
-			|| this->_eventType == ProvenanceEventRecord::CLONE
-			|| this->_eventType == ProvenanceEventRecord::JOIN) {
-		// write UUIDs
-		uint32_t number = this->_parentUuids.size();
-		ret = write(number,&outStream);
-		if (ret != 4) {
-
-			return false;
-		}
-		for (auto parentUUID : _parentUuids) {
-			ret = writeUTF(parentUUID,&outStream);
-			if (ret <= 0) {
-
-				return false;
-			}
-		}
-		number = this->_childrenUuids.size();
-		ret = write(number,&outStream);
-		if (ret != 4) {
-			return false;
-		}
-		for (auto childUUID : _childrenUuids) {
-			ret = writeUTF(childUUID,&outStream);
-			if (ret <= 0) {
-
-				return false;
-			}
-		}
-	} else if (this->_eventType == ProvenanceEventRecord::SEND
-			|| this->_eventType == ProvenanceEventRecord::FETCH) {
-		ret = writeUTF(this->_transitUri,&outStream);
-		if (ret <= 0) {
-
-			return false;
-		}
-	} else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
-		ret = writeUTF(this->_transitUri,&outStream);
-		if (ret <= 0) {
-
-			return false;
-		}
-		ret = writeUTF(this->_sourceSystemFlowFileIdentifier,&outStream);
-		if (ret <= 0) {
-
-			return false;
-		}
-	}
-
-	// Persistent to the DB
-
-	if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
-		logger_->log_debug("NiFi Provenance Store event %s size %d success",
-				_eventIdStr.c_str(), outStream.getSize());
-		return true;
-	} else {
-		logger_->log_error("NiFi Provenance Store event %s size %d fail",
-				_eventIdStr.c_str(), outStream.getSize());
-		return false;
-	}
-
-	// cleanup
-
-	return true;
-}
-
-bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
-
-	int ret;
-
-	DataStream outStream(buffer,bufferSize);
-
-	ret = readUTF(this->_eventIdStr,&outStream);
-
-	if (ret <= 0) {
-		return false;
-	}
-
-	uint32_t eventType;
-	ret = read(eventType,&outStream);
-	if (ret != 4) {
-		return false;
-	}
-	this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
-
-	ret = read(this->_eventTime,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = read(this->_entryDate,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = read(this->_eventDuration,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = read(this->_lineageStartDate,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = readUTF(this->_componentId,&outStream);
-	if (ret <= 0) {
-		return false;
-	}
-
-	ret = readUTF(this->_componentType,&outStream);
-	if (ret <= 0) {
-		return false;
-	}
-
-	ret = readUTF(this->_uuid,&outStream);
-	if (ret <= 0) {
-		return false;
-	}
-
-	ret = readUTF(this->_details,&outStream);
-
-	if (ret <= 0) {
-		return false;
-	}
-
-	// read flow attributes
-	uint32_t numAttributes = 0;
-	ret = read(numAttributes,&outStream);
-	if (ret != 4) {
-		return false;
-	}
-
-	for (uint32_t i = 0; i < numAttributes; i++) {
-		std::string key;
-		ret = readUTF(key,&outStream, true);
-		if (ret <= 0) {
-			return false;
-		}
-		std::string value;
-		ret = readUTF(value,&outStream, true);
-		if (ret <= 0) {
-			return false;
-		}
-		this->_attributes[key] = value;
-	}
-
-	ret = readUTF(this->_contentFullPath,&outStream);
-	if (ret <= 0) {
-		return false;
-	}
-
-	ret = read(this->_size,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = read(this->_offset,&outStream);
-	if (ret != 8) {
-		return false;
-	}
-
-	ret = readUTF(this->_sourceQueueIdentifier,&outStream);
-	if (ret <= 0) {
-		return false;
-	}
-
-	if (this->_eventType == ProvenanceEventRecord::FORK
-			|| this->_eventType == ProvenanceEventRecord::CLONE
-			|| this->_eventType == ProvenanceEventRecord::JOIN) {
-		// read UUIDs
-		uint32_t number = 0;
-		ret = read(number,&outStream);
-		if (ret != 4) {
-			return false;
-		}
-
-
-		for (uint32_t i = 0; i < number; i++) {
-			std::string parentUUID;
-			ret = readUTF(parentUUID,&outStream);
-			if (ret <= 0) {
-				return false;
-			}
-			this->addParentUuid(parentUUID);
-		}
-		number = 0;
-		ret = read(number,&outStream);
-		if (ret != 4) {
-			return false;
-		}
-		for (uint32_t i = 0; i < number; i++) {
-			std::string childUUID;
-			ret = readUTF(childUUID,&outStream);
-			if (ret <= 0) {
-				return false;
-			}
-			this->addChildUuid(childUUID);
-		}
-	} else if (this->_eventType == ProvenanceEventRecord::SEND
-			|| this->_eventType == ProvenanceEventRecord::FETCH) {
-		ret = readUTF(this->_transitUri,&outStream);
-		if (ret <= 0) {
-			return false;
-		}
-	} else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
-		ret = readUTF(this->_transitUri,&outStream);
-		if (ret <= 0) {
-			return false;
-		}
-		ret = readUTF(this->_sourceSystemFlowFileIdentifier,&outStream);
-		if (ret <= 0) {
-			return false;
-		}
-	}
-
-	return true;
-}
-
-void ProvenanceReporter::commit() {
-	if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable())
-		return;
-	for (auto event : _events) {
-		if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull()) {
-			event->Serialize(
-					FlowControllerFactory::getFlowController()->getProvenanceRepository());
-		} else {
-			logger_->log_debug("Provenance Repository is full");
-		}
-	}
-}
-
-void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE,
-			flow);
-
-	if (event) {
-		event->setDetails(detail);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation,
-		std::string detail, uint64_t processingDuration) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, flow);
-
-	if (event) {
-		event->setDetails(detail);
-		event->setRelationship(relation.getName());
-		event->setEventDuration(processingDuration);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow,
-		std::string detail) {
-	ProvenanceEventRecord *event = allocate(
-			ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
-
-	if (event) {
-		event->setDetails(detail);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string detail,
-		uint64_t processingDuration) {
-	ProvenanceEventRecord *event = allocate(
-			ProvenanceEventRecord::CONTENT_MODIFIED, flow);
-
-	if (event) {
-		event->setDetails(detail);
-		event->setEventDuration(processingDuration);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE,
-			parent);
-
-	if (event) {
-		event->addChildFlowFile(child);
-		event->addParentFlowFile(parent);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::join(std::vector<FlowFileRecord *> parents,
-		FlowFileRecord *child, std::string detail,
-		uint64_t processingDuration) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, child);
-
-	if (event) {
-		event->addChildFlowFile(child);
-		std::vector<FlowFileRecord *>::iterator it;
-		for (it = parents.begin(); it != parents.end(); it++) {
-			FlowFileRecord *record = *it;
-			event->addParentFlowFile(record);
-		}
-		event->setDetails(detail);
-		event->setEventDuration(processingDuration);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::fork(std::vector<FlowFileRecord *> child,
-		FlowFileRecord *parent, std::string detail,
-		uint64_t processingDuration) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK,
-			parent);
-
-	if (event) {
-		event->addParentFlowFile(parent);
-		std::vector<FlowFileRecord *>::iterator it;
-		for (it = child.begin(); it != child.end(); it++) {
-			FlowFileRecord *record = *it;
-			event->addChildFlowFile(record);
-		}
-		event->setDetails(detail);
-		event->setEventDuration(processingDuration);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE,
-			flow);
-
-	if (event) {
-		event->setDetails(detail);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, flow);
-
-	if (event) {
-		std::string dropReason = "Discard reason: " + reason;
-		event->setDetails(dropReason);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri,
-		std::string detail, uint64_t processingDuration, bool force) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, flow);
-
-	if (event) {
-		event->setTransitUri(transitUri);
-		event->setDetails(detail);
-		event->setEventDuration(processingDuration);
-		if (!force) {
-			add(event);
-		} else {
-			if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull())
-				event->Serialize(
-						FlowControllerFactory::getFlowController()->getProvenanceRepository());
-			delete event;
-		}
-	}
-}
-
-void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri,
-		std::string sourceSystemFlowFileIdentifier, std::string detail,
-		uint64_t processingDuration) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE,
-			flow);
-
-	if (event) {
-		event->setTransitUri(transitUri);
-		event->setDetails(detail);
-		event->setEventDuration(processingDuration);
-		event->setSourceSystemFlowFileIdentifier(
-				sourceSystemFlowFileIdentifier);
-		add(event);
-	}
-}
-
-void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri,
-		std::string detail, uint64_t processingDuration) {
-	ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, flow);
-
-	if (event) {
-		event->setTransitUri(transitUri);
-		event->setDetails(detail);
-		event->setEventDuration(processingDuration);
-		add(event);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/PutFile.cpp b/libminifi/src/PutFile.cpp
deleted file mode 100644
index d7cc83a..0000000
--- a/libminifi/src/PutFile.cpp
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * @file PutFile.cpp
- * PutFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <uuid/uuid.h>
-
-#include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
-#include "PutFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace");
-const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore");
-const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail");
-
-const std::string PutFile::ProcessorName("PutFile");
-
-Property PutFile::Directory("Output Directory", "The output directory to which to put files", ".");
-Property PutFile::ConflictResolution("Conflict Resolution Strategy", "Indicates what should happen when a file with the same name already exists in the output directory", CONFLICT_RESOLUTION_STRATEGY_FAIL);
-
-Relationship PutFile::Success("success", "All files are routed to success");
-Relationship PutFile::Failure("failure", "Failed files (conflict, write failure, etc.) are transferred to failure");
-
-void PutFile::initialize()
-{
-	//! Set the supported properties
-	std::set<Property> properties;
-	properties.insert(Directory);
-	properties.insert(ConflictResolution);
-	setSupportedProperties(properties);
-	//! Set the supported relationships
-	std::set<Relationship> relationships;
-	relationships.insert(Success);
-	relationships.insert(Failure);
-	setSupportedRelationships(relationships);
-}
-
-void PutFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-	std::string directory;
-
-	if (!context->getProperty(Directory.getName(), directory))
-	{
-		logger_->log_error("Directory attribute is missing or invalid");
-		return;
-	}
-
-	std::string conflictResolution;
-
-	if (!context->getProperty(ConflictResolution.getName(), conflictResolution))
-	{
-		logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid");
-		return;
-	}
-
-	FlowFileRecord *flowFile = session->get();
-
-	// Do nothing if there are no incoming files
-	if (!flowFile)
-	{
-		return;
-	}
-
-	std::string filename;
-	flowFile->getAttribute(FILENAME, filename);
-
-	// Generate a safe (universally-unique) temporary filename on the same partition 
-	char tmpFileUuidStr[37];
-	uuid_t tmpFileUuid;
-	uuid_generate(tmpFileUuid);
-	uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr);
-	std::stringstream tmpFileSs;
-	tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr;
-	std::string tmpFile = tmpFileSs.str();
-	logger_->log_info("PutFile using temporary file %s", tmpFile.c_str());
-
-	// Determine dest full file paths
-	std::stringstream destFileSs;
-	destFileSs << directory << "/" << filename;
-	std::string destFile = destFileSs.str();
-
-	logger_->log_info("PutFile writing file %s into directory %s", filename.c_str(), directory.c_str());
-
-	// If file exists, apply conflict resolution strategy
-	struct stat statResult;
-
-	if (stat(destFile.c_str(), &statResult) == 0)
-	{
-		logger_->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.c_str(), conflictResolution.c_str());
-
-		if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE)
-		{
-			putFile(session, flowFile, tmpFile, destFile);
-		}
-		else if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_IGNORE)
-		{
-			session->transfer(flowFile, Success);
-		}
-		else
-		{
-			session->transfer(flowFile, Failure);
-		}
-	}
-	else
-	{
-		putFile(session, flowFile, tmpFile, destFile);
-	}
-}
-
-bool PutFile::putFile(ProcessSession *session, FlowFileRecord *flowFile, const std::string &tmpFile, const std::string &destFile)
-{
-
-	ReadCallback cb(tmpFile, destFile);
-    	session->read(flowFile, &cb);
-
-	if (cb.commit())
-	{
-		session->transfer(flowFile, Success);
-		return true;
-	}
-	else
-	{
-		session->transfer(flowFile, Failure);
-	}
-	return false;
-}
-
-PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile)
-: _tmpFile(tmpFile)
-, _tmpFileOs(tmpFile)
-, _destFile(destFile)
-{
-	logger_ = Logger::getLogger();
-}
-
-// Copy the entire file contents to the temporary file
-void PutFile::ReadCallback::process(std::ifstream *stream)
-{
-	// Copy file contents into tmp file
-	_writeSucceeded = false;
-	_tmpFileOs << stream->rdbuf();
-	_writeSucceeded = true;
-}
-
-// Renames tmp file to final destination
-// Returns true if commit succeeded
-bool PutFile::ReadCallback::commit()
-{
-	bool success = false;
-
-	logger_->log_info("PutFile committing put file operation to %s", _destFile.c_str());
-
-	if (_writeSucceeded)
-	{
-		_tmpFileOs.close();
-
-		if (rename(_tmpFile.c_str(), _destFile.c_str()))
-		{
-			logger_->log_info("PutFile commit put file operation to %s failed because rename() call failed", _destFile.c_str());
-		}
-		else
-		{
-			success = true;
-			logger_->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str());
-		}
-	}
-	else
-	{
-		logger_->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str());
-	}
-
-	return success;
-}
-
-// Clean up resources
-PutFile::ReadCallback::~ReadCallback() {
-	// Close tmp file
-	_tmpFileOs.close();
-
-	// Clean up tmp file, if necessary
-	unlink(_tmpFile.c_str());
-}


Mime
View raw message