nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (MINIFI-231) FlowFile Persistent
Date Thu, 02 Mar 2017 16:15:45 GMT

    [ https://issues.apache.org/jira/browse/MINIFI-231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15892511#comment-15892511
] 

ASF GitHub Bot commented on MINIFI-231:
---------------------------------------

Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r103961630
  
    --- Diff: libminifi/src/Repository.cpp ---
    @@ -0,0 +1,140 @@
    +/**
    + * @file Repository.cpp
    + * Repository implemenatation 
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include <cstdint>
    +#include <vector>
    +#include <arpa/inet.h>
    +#include "io/DataStream.h"
    +#include "io/Serializable.h"
    +#include "Relationship.h"
    +#include "Logger.h"
    +#include "FlowController.h"
    +#include "Repository.h"
    +#include "Provenance.h"
    +#include "FlowFileRepository.h"
    +
    +const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace Repository", "FlowFile
Repository"};
    +uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0}; 
    +
    +void Repository::start() {
    +	if (!_enable)
    +		return;
    +	if (this->_purgePeriod <= 0)
    +		return;
    +	if (_running)
    +		return;
    +	_running = true;
    +	logger_->log_info("%s Repository Monitor Thread Start", RepositoryTypeStr[_type]);
    +	_thread = new std::thread(run, this);
    +	_thread->detach();
    +}
    +
    +void Repository::stop() {
    +	if (!_running)
    +		return;
    +	_running = false;
    +	logger_->log_info("%s Repository Monitor Thread Stop", RepositoryTypeStr[_type]);
    +}
    +
    +void Repository::run(Repository *repo) {
    +	// threshold for purge
    +	uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
    +	while (repo->_running) {
    +		std::this_thread::sleep_for(
    +				std::chrono::milliseconds(repo->_purgePeriod));
    +		uint64_t curTime = getTimeMillis();
    +		uint64_t size = repo->repoSize();
    +		if (size >= purgeThreshold) {
    +			std::vector<std::string> purgeList;
    +			leveldb::Iterator* it = repo->_db->NewIterator(
    +					leveldb::ReadOptions());
    +			if (repo->_type == PROVENANCE)
    +			{
    +				for (it->SeekToFirst(); it->Valid(); it->Next()) {
    +					ProvenanceEventRecord eventRead;
    +					std::string key = it->key().ToString();
    +					if (eventRead.DeSerialize((uint8_t *) it->value().data(),
    +						(int) it->value().size())) {
    +						if ((curTime - eventRead.getEventTime())
    +							> repo->_maxPartitionMillis)
    +							purgeList.push_back(key);
    +					} else {
    +						repo->logger_->log_debug(
    +							"NiFi %s retrieve event %s fail",
    +							RepositoryTypeStr[repo->_type],
    +							key.c_str());
    +						purgeList.push_back(key);
    +					}
    +				}
    +			}
    +			if (repo->_type == FLOWFILE)
    +			{
    +				for (it->SeekToFirst(); it->Valid(); it->Next()) {
    +					FlowFileEventRecord eventRead;
    +					std::string key = it->key().ToString();
    +					if (eventRead.DeSerialize((uint8_t *) it->value().data(),
    +						(int) it->value().size())) {
    +						if ((curTime - eventRead.getEventTime())
    +							> repo->_maxPartitionMillis)
    +							purgeList.push_back(key);
    +					} else {
    +						repo->logger_->log_debug(
    +							"NiFi %s retrieve event %s fail",
    +							RepositoryTypeStr[repo->_type],
    +							key.c_str());
    +						purgeList.push_back(key);
    +					}
    +				}
    +			}
    +			delete it;
    +			std::vector<std::string>::iterator itPurge;
    +			for (itPurge = purgeList.begin(); itPurge != purgeList.end();
    --- End diff --
    
    why not use auto?


> FlowFile Persistent 
> --------------------
>
>                 Key: MINIFI-231
>                 URL: https://issues.apache.org/jira/browse/MINIFI-231
>             Project: Apache NiFi MiNiFi
>          Issue Type: Bug
>          Components: Core Framework, Data Format
>    Affects Versions: cpp-1.0.0
>            Reporter: bqiu
>            Assignee: bqiu
>             Fix For: cpp-1.0.0
>
>
> Add FlowFile persistent support



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message