nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (MINIFI-231) FlowFile Persistent
Date Thu, 02 Mar 2017 16:14:46 GMT

    [ https://issues.apache.org/jira/browse/MINIFI-231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15892503#comment-15892503
] 

ASF GitHub Bot commented on MINIFI-231:
---------------------------------------

Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103958761
  
    --- Diff: libminifi/include/Repository.h ---
    @@ -0,0 +1,294 @@
    +/**
    + * @file Repository 
    + * Repository class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __REPOSITORY_H__
    +#define __REPOSITORY_H__
    +
    +#include <ftw.h>
    +#include <uuid/uuid.h>
    +#include <atomic>
    +#include <cstdint>
    +#include <cstring>
    +#include <iostream>
    +#include <map>
    +#include <set>
    +#include <string>
    +#include <thread>
    +#include <vector>
    +
    +#include "leveldb/db.h"
    +#include "leveldb/options.h"
    +#include "leveldb/slice.h"
    +#include "leveldb/status.h"
    +#include "Configure.h"
    +#include "Connection.h"
    +#include "FlowFileRecord.h"
    +#include "Logger.h"
    +#include "Property.h"
    +#include "ResourceClaim.h"
    +#include "io/Serializable.h"
    +#include "utils/TimeUtil.h"
    +#include "utils/StringUtils.h"
    +
    +//! Repository
    +class Repository
    +{
    +public:
    +	enum RepositoryType {
    +		//! Provenance Repo Type
    +		PROVENANCE,
    +		//! FlowFile Repo Type
    +		FLOWFILE,
    +		MAX_REPO_TYPE
    +	};
    +	static const char *RepositoryTypeStr[MAX_REPO_TYPE];
    +	//! Constructor
    +	/*!
    +	 * Create a new provenance repository
    +	 */
    +	Repository(RepositoryType type, std::string directory, 
    +		int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) {
    +		_type = type;
    +		_directory = directory;
    +		_maxPartitionMillis = maxPartitionMillis;
    +		_maxPartitionBytes = maxPartitionBytes;
    +		_purgePeriod = purgePeriod;
    +		logger_ = Logger::getLogger();
    +		configure_ = Configure::getConfigure();
    +		_db = NULL;
    +		_thread = NULL;
    +		_running = false;
    +		_repoFull = false;
    +		_enable = true;
    +	}
    +
    +	//! Destructor
    +	virtual ~Repository() {
    +		stop();
    +		if (this->_thread)
    +			delete this->_thread;
    +		destroy();
    +	}
    +
    +	//! initialize
    +	virtual bool initialize()
    +	{
    +		std::string value;
    +
    +		if (_type == PROVENANCE)
    +		{
    +			if (!(configure_->get(Configure::nifi_provenance_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_provenance_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		if (_type == FLOWFILE)
    +		{
    +			if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
    +					&& StringUtils::StringToBool(value, _enable))) {
    +				_enable = true;
    +			}
    +			if (!_enable)
    +				return false;
    +			if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
    +			{
    +				_directory = value;
    +			}
    +			logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str());
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
    +			{
    +				Property::StringToInt(value, _maxPartitionBytes);
    +			}
    +			logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes);
    +			if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
    +			{
    +				TimeUnit unit;
    +				if (Property::StringToTime(value, _maxPartitionMillis, unit) &&
    +							Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
    +				{
    +				}
    +			}
    +			logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis);
    +			leveldb::Options options;
    +			options.create_if_missing = true;
    +			leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db);
    +			if (status.ok())
    +			{
    +				logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str());
    +			}
    +			else
    +			{
    +				logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str());
    +				return false;
    +			}
    +		}
    +
    +		return true;
    +	}
    +	//! Put
    +	virtual bool Put(std::string key, uint8_t *buf, int bufLen)
    +	{
    +		if (!_enable)
    +			return false;
    +			
    +		// persistent to the DB
    +		leveldb::Slice value((const char *) buf, bufLen);
    +		leveldb::Status status;
    +		status = _db->Put(leveldb::WriteOptions(), key, value);
    +		if (status.ok())
    +			return true;
    +		else
    +			return false;
    +	}
    +	//! Delete
    +	virtual bool Delete(std::string key)
    --- End diff --
    
    const std::string &key ?


> FlowFile Persistent 
> --------------------
>
>                 Key: MINIFI-231
>                 URL: https://issues.apache.org/jira/browse/MINIFI-231
>             Project: Apache NiFi MiNiFi
>          Issue Type: Bug
>          Components: Core Framework, Data Format
>    Affects Versions: cpp-1.0.0
>            Reporter: bqiu
>            Assignee: bqiu
>             Fix For: cpp-1.0.0
>
>
> Add FlowFile persistent support



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message