Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EB323200C2E for ; Sun, 5 Mar 2017 18:23:42 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E9C27160B6B; Sun, 5 Mar 2017 17:23:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19F22160B57 for ; Sun, 5 Mar 2017 18:23:41 +0100 (CET) Received: (qmail 41105 invoked by uid 500); 5 Mar 2017 17:23:41 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 41096 invoked by uid 99); 5 Mar 2017 17:23:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Mar 2017 17:23:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id DB4961A0276 for ; Sun, 5 Mar 2017 17:23:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -2.347 X-Spam-Level: X-Spam-Status: No, score=-2.347 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-2.999, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id G08NdyRXZFMV for ; Sun, 5 Mar 2017 17:23:39 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 770405F479 for ; Sun, 5 Mar 2017 17:23:38 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id C84BAE02F7 for ; Sun, 5 Mar 2017 17:23:33 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 0A6D924168 for ; Sun, 5 Mar 2017 17:23:33 +0000 (UTC) Date: Sun, 5 Mar 2017 17:23:33 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: commits@nifi.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (MINIFI-231) FlowFile Persistent MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sun, 05 Mar 2017 17:23:43 -0000 [ https://issues.apache.org/jira/browse/MINIFI-231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896444#comment-15896444 ] ASF GitHub Bot commented on MINIFI-231: --------------------------------------- Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/62#discussion_r104318245 --- Diff: libminifi/include/Repository.h --- @@ -0,0 +1,294 @@ +/** + * @file Repository + * Repository class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __REPOSITORY_H__ +#define __REPOSITORY_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" +#include "Configure.h" +#include "Connection.h" +#include "FlowFileRecord.h" +#include "Logger.h" +#include "Property.h" +#include "ResourceClaim.h" +#include "io/Serializable.h" +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" + +//! Repository +class Repository +{ +public: + enum RepositoryType { + //! Provenance Repo Type + PROVENANCE, + //! FlowFile Repo Type + FLOWFILE, + MAX_REPO_TYPE + }; + static const char *RepositoryTypeStr[MAX_REPO_TYPE]; + //! Constructor + /*! + * Create a new provenance repository + */ + Repository(RepositoryType type, std::string directory, + int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) { + _type = type; + _directory = directory; + _maxPartitionMillis = maxPartitionMillis; + _maxPartitionBytes = maxPartitionBytes; + _purgePeriod = purgePeriod; + logger_ = Logger::getLogger(); + configure_ = Configure::getConfigure(); + _db = NULL; + _thread = NULL; + _running = false; + _repoFull = false; + _enable = true; + } + + //! Destructor + virtual ~Repository() { + stop(); + if (this->_thread) + delete this->_thread; + destroy(); + } + + //! initialize + virtual bool initialize() + { + std::string value; + + if (_type == PROVENANCE) + { + if (!(configure_->get(Configure::nifi_provenance_repository_enable, value) + && StringUtils::StringToBool(value, _enable))) { + _enable = true; + } + if (!_enable) + return false; + if (configure_->get(Configure::nifi_provenance_repository_directory_default, value)) + { + _directory = value; + } + logger_->log_info("NiFi Provenance Repository Directory %s", _directory.c_str()); + if (configure_->get(Configure::nifi_provenance_repository_max_storage_size, value)) + { + Property::StringToInt(value, _maxPartitionBytes); + } + logger_->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes); + if (configure_->get(Configure::nifi_provenance_repository_max_storage_time, value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _maxPartitionMillis, unit) && + Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis)) + { + } + } + logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis); + leveldb::Options options; + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db); + if (status.ok()) + { + logger_->log_info("NiFi Provenance Repository database open %s success", _directory.c_str()); + } + else + { + logger_->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str()); + return false; + } + } + + if (_type == FLOWFILE) + { + if (!(configure_->get(Configure::nifi_flowfile_repository_enable, value) + && StringUtils::StringToBool(value, _enable))) { + _enable = true; + } + if (!_enable) + return false; + if (configure_->get(Configure::nifi_flowfile_repository_directory_default, value)) + { + _directory = value; + } + logger_->log_info("NiFi FlowFile Repository Directory %s", _directory.c_str()); + if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value)) + { + Property::StringToInt(value, _maxPartitionBytes); + } + logger_->log_info("NiFi FlowFile Max Partition Bytes %d", _maxPartitionBytes); + if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _maxPartitionMillis, unit) && + Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis)) + { + } + } + logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", _maxPartitionMillis); + leveldb::Options options; + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db); + if (status.ok()) + { + logger_->log_info("NiFi FlowFile Repository database open %s success", _directory.c_str()); + } + else + { + logger_->log_error("NiFi FlowFile Repository database open %s fail", _directory.c_str()); + return false; + } + } + + return true; + } + //! Put + virtual bool Put(std::string key, uint8_t *buf, int bufLen) + { + if (!_enable) + return false; + + // persistent to the DB + leveldb::Slice value((const char *) buf, bufLen); + leveldb::Status status; + status = _db->Put(leveldb::WriteOptions(), key, value); + if (status.ok()) + return true; + else + return false; + } + //! Delete + virtual bool Delete(std::string key) + { + if (!_enable) + return false; + leveldb::Status status; + status = _db->Delete(leveldb::WriteOptions(), key); + if (status.ok()) + return true; + else + return false; + } + //! Get + virtual bool Get(std::string key, std::string &value) + { + if (!_enable) + return false; + leveldb::Status status; + status = _db->Get(leveldb::ReadOptions(), key, &value); + if (status.ok()) + return true; + else + return false; + } + //! destroy + void destroy() + { + if (_db) --- End diff -- we only call destroy when we delete the repo, will change to a private function. > FlowFile Persistent > -------------------- > > Key: MINIFI-231 > URL: https://issues.apache.org/jira/browse/MINIFI-231 > Project: Apache NiFi MiNiFi > Issue Type: Bug > Components: Core Framework, Data Format > Affects Versions: cpp-1.0.0 > Reporter: bqiu > Assignee: bqiu > Fix For: cpp-1.0.0 > > > Add FlowFile persistent support -- This message was sent by Atlassian JIRA (v6.3.15#6346)