From commits-return-484-archive-asf-public=cust-asf.ponee.io@tubemq.apache.org Wed Jul 8 03:08:44 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 564D5180656 for ; Wed, 8 Jul 2020 05:08:43 +0200 (CEST) Received: (qmail 73528 invoked by uid 500); 8 Jul 2020 03:08:39 -0000 Mailing-List: contact commits-help@tubemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tubemq.apache.org Delivered-To: mailing list commits@tubemq.apache.org Received: (qmail 73176 invoked by uid 99); 8 Jul 2020 03:08:38 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jul 2020 03:08:38 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 98133890B8; Wed, 8 Jul 2020 03:08:38 +0000 (UTC) Date: Wed, 08 Jul 2020 03:08:38 +0000 To: "commits@tubemq.apache.org" Subject: [incubator-tubemq] branch tubemq-client-cpp updated: [TUBEMQ-272]Unified C/C++ files's code style (#193) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <159417771853.12387.16699028108534440599@gitbox.apache.org> From: gosonzhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-tubemq X-Git-Refname: refs/heads/tubemq-client-cpp X-Git-Reftype: branch X-Git-Oldrev: 9883c5c24b7e09a6170e050515575cf07bcfad59 X-Git-Newrev: 9f30453dd9e9297e067eac83ce2e8914d4a6344a X-Git-Rev: 9f30453dd9e9297e067eac83ce2e8914d4a6344a X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch tubemq-client-cpp in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git The following commit(s) were added to refs/heads/tubemq-client-cpp by this push: new 9f30453 [TUBEMQ-272]Unified C/C++ files's code style (#193) 9f30453 is described below commit 9f30453dd9e9297e067eac83ce2e8914d4a6344a Author: gosonzhang <4675739@qq.com> AuthorDate: Wed Jul 8 03:08:15 2020 +0000 [TUBEMQ-272]Unified C/C++ files's code style (#193) Co-authored-by: gosonzhang --- .../tubemq-client-cpp/inc/atomic_def.h | 28 ++- .../tubemq-client-cpp/inc/client_config.h | 108 +++++----- .../tubemq-client-cpp/inc/const_config.h | 60 +++--- .../tubemq-client-cpp/inc/const_rpc.h | 87 ++++---- .../tubemq-client-cpp/inc/file_ini.h | 23 +- .../tubemq-client-cpp/inc/flowctrl_def.h | 132 ++++++------ tubemq-client-twins/tubemq-client-cpp/inc/logger.h | 3 +- .../tubemq-client-cpp/inc/message.h | 44 ++-- .../tubemq-client-cpp/inc/meta_info.h | 81 ++++--- .../tubemq-client-cpp/inc/singleton.h | 2 +- tubemq-client-twins/tubemq-client-cpp/inc/utils.h | 33 +-- .../tubemq-client-cpp/inc/version.h | 14 +- .../tubemq-client-cpp/src/client_config.cc | 163 +++++++------- .../tubemq-client-cpp/src/file_ini.cc | 25 +-- .../tubemq-client-cpp/src/flowctrl_def.cc | 237 +++++++++++---------- .../tubemq-client-cpp/src/logger.cc | 2 - .../tubemq-client-cpp/src/message.cc | 44 ++-- .../tubemq-client-cpp/src/meta_info.cc | 94 ++++---- tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 81 +++---- 19 files changed, 642 insertions(+), 619 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h b/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h index 24df5f3..00d0c24 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/atomic_def.h @@ -17,23 +17,22 @@ * under the License. */ -#ifndef _TUBEMQ_CLIENT_ATOMIC_DEF_H_ -#define _TUBEMQ_CLIENT_ATOMIC_DEF_H_ +#ifndef TUBEMQ_CLIENT_ATOMIC_DEF_H_ +#define TUBEMQ_CLIENT_ATOMIC_DEF_H_ #include - namespace tubemq { using namespace std; class AtomicInteger { public: - AtomicInteger(){ + AtomicInteger() { this->counter_ = 0; } - + AtomicInteger(int initial_value) { this->counter_ = initial_value; } @@ -69,7 +68,7 @@ class AtomicInteger { int next = current + 1; if (__sync_bool_compare_and_swap(&this->counter_, current, next)) { return current; - } + } } } @@ -79,7 +78,7 @@ class AtomicInteger { int next = current - 1; if (__sync_bool_compare_and_swap(&this->counter_, current, next)) { return current; - } + } } } @@ -89,7 +88,7 @@ class AtomicInteger { int next = current + delta; if (__sync_bool_compare_and_swap(&this->counter_, current, next)) { return current; - } + } } } @@ -117,12 +116,12 @@ class AtomicInteger { for ( ; ; ) { int current = this->counter_; int next = current + delta; - if (__sync_bool_compare_and_swap (&this->counter_, current, next)) { + if (__sync_bool_compare_and_swap(&this->counter_, current, next)) { return next; } } } - + private: volatile int counter_; }; @@ -217,7 +216,7 @@ class AtomicLong { for ( ; ; ) { long current = this->counter_; long next = current + delta; - if (__sync_bool_compare_and_swap (&this->counter_, current, next)) { + if (__sync_bool_compare_and_swap(&this->counter_, current, next)) { return next; } } @@ -267,11 +266,8 @@ class AtomicBoolean{ }; +} // namespace tubemq - -} - - -#endif +#endif // TUBEMQ_CLIENT_ATOMIC_DEF_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/client_config.h b/tubemq-client-twins/tubemq-client-cpp/inc/client_config.h index 0dcf458..41610e1 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/client_config.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/client_config.h @@ -16,20 +16,25 @@ * specific language governing permissions and limitations * under the License. */ - -#ifndef _TUBEMQ_CLIENT_CONFIGURE_H_ -#define _TUBEMQ_CLIENT_CONFIGURE_H_ -#include +#ifndef TUBEMQ_CLIENT_CONFIGURE_H_ +#define TUBEMQ_CLIENT_CONFIGURE_H_ + +#include #include #include #include - +#include namespace tubemq { -using namespace std; + +using std::map; +using std::set; +using std::string; +using std::vector; + class BaseConfig { @@ -38,27 +43,27 @@ class BaseConfig { ~BaseConfig(); BaseConfig& operator=(const BaseConfig& target); bool SetMasterAddrInfo(string& err_info, const string& master_addrinfo); - bool SetTlsInfo(string& err_info, bool tls_enable, + bool SetTlsInfo(string& err_info, bool tls_enable, const string& trust_store_path, const string& trust_store_password); - bool SetAuthenticInfo(string& err_info, bool authentic_enable, - const string& usr_name, const string& usr_password); + bool SetAuthenticInfo(string& err_info, bool authentic_enable, + const string& usr_name, const string& usr_password); const string& GetMasterAddrInfo() const; bool IsTlsEnabled(); const string& GetTrustStorePath() const; const string& GetTrustStorePassword() const; bool IsAuthenticEnabled(); const string& GetUsrName() const; - const string& GetUsrPassWord() const; + const string& GetUsrPassWord() const; // set the rpc timout, unit second, duration [8, 300], default 15 seconds; - void SetRpcReadTimeoutSec(int rpc_read_timeout_sec); - int GetRpcReadTimeoutSec(); + void SetRpcReadTimeoutSec(int32_t rpc_read_timeout_sec); + int32_t GetRpcReadTimeoutSec(); // Set the duration of the client's heartbeat cycle, in seconds, the default is 10 seconds - void SetHeartbeatPeriodSec(int heartbeat_period_sec); - int GetHeartbeatPeriodSec(); - void SetMaxHeartBeatRetryTimes(int max_heartbeat_retry_times); - int GetMaxHeartBeatRetryTimes(); - void SetHeartbeatPeriodAftFailSec(int heartbeat_period_afterfail_sec); - int GetHeartbeatPeriodAftFailSec(); + void SetHeartbeatPeriodSec(int32_t heartbeat_period_sec); + int32_t GetHeartbeatPeriodSec(); + void SetMaxHeartBeatRetryTimes(int32_t max_heartbeat_retry_times); + int32_t GetMaxHeartBeatRetryTimes(); + void SetHeartbeatPeriodAftFailSec(int32_t heartbeat_period_afterfail_sec); + int32_t GetHeartbeatPeriodAftFailSec(); string ToString(); private: @@ -72,10 +77,10 @@ class BaseConfig { string tls_trust_store_path_; string tls_trust_store_password_; // other setting - int rpc_read_timeout_sec_; - int heartbeat_period_sec_; - int max_heartbeat_retry_times_; - int heartbeat_period_afterfail_sec_; + int32_t rpc_read_timeout_sec_; + int32_t heartbeat_period_sec_; + int32_t max_heartbeat_retry_times_; + int32_t heartbeat_period_afterfail_sec_; }; @@ -83,72 +88,73 @@ enum ConsumePosition { kConsumeFromFirstOffset = -1, kConsumeFromLatestOffset = 0, kComsumeFromMaxOffsetAlways = 1 -}; +}; // enum ConsumePosition class ConsumerConfig : public BaseConfig { - public: + public: ConsumerConfig(); ~ConsumerConfig(); - ConsumerConfig& operator=(const ConsumerConfig& target); - bool SetGroupConsumeTarget(string& err_info, + ConsumerConfig& operator=(const ConsumerConfig& target); + bool SetGroupConsumeTarget(string& err_info, const string& group_name, const set& subscribed_topicset); - bool SetGroupConsumeTarget(string& err_info, + bool SetGroupConsumeTarget(string& err_info, const string& group_name, const map >& subscribed_topic_and_filter_map); - bool SetGroupConsumeTarget(string& err_info, + bool SetGroupConsumeTarget(string& err_info, const string& group_name, const map >& subscribed_topic_and_filter_map, - const string& session_key, int source_count, bool is_select_big, const map& part_offset_map); + const string& session_key, uint32_t source_count, bool is_select_big, + const map& part_offset_map); const string& GetGroupName() const; const map >& GetSubTopicAndFilterMap() const; void SetConsumePosition(ConsumePosition consume_from_where); const ConsumePosition GetConsumePosition() const; - const int GetMsgNotFoundWaitPeriodMs() const; + const int32_t GetMsgNotFoundWaitPeriodMs() const; void SetMsgNotFoundWaitPeriodMs(int msg_notfound_wait_period_ms); - const int GetMaxSubinfoReportIntvl() const; - void SetMaxSubinfoReportIntvl(int max_subinfo_report_intvl); + const int32_t GetMaxSubinfoReportIntvl() const; + void SetMaxSubinfoReportIntvl(int32_t max_subinfo_report_intvl); bool IsConfirmInLocal(); void SetConfirmInLocal(bool confirm_in_local); bool IsRollbackIfConfirmTimeout(); void setRollbackIfConfirmTimeout(bool is_rollback_if_confirm_timeout); - const int GetWaitPeriodIfConfirmWaitRebalanceMs() const; - void SetWaitPeriodIfConfirmWaitRebalanceMs(int reb_confirm_wait_period_ms); - const int GetMaxConfirmWaitPeriodMs() const; - void SetMaxConfirmWaitPeriodMs(int max_confirm_wait_period_ms); - const int GetShutdownRebWaitPeriodMs() const; - void SetShutdownRebWaitPeriodMs(int wait_period_when_shutdown_ms); + const int32_t GetWaitPeriodIfConfirmWaitRebalanceMs() const; + void SetWaitPeriodIfConfirmWaitRebalanceMs(int32_t reb_confirm_wait_period_ms); + const int32_t GetMaxConfirmWaitPeriodMs() const; + void SetMaxConfirmWaitPeriodMs(int32_t max_confirm_wait_period_ms); + const int32_t GetShutdownRebWaitPeriodMs() const; + void SetShutdownRebWaitPeriodMs(int32_t wait_period_when_shutdown_ms); string ToString(); private: bool setGroupConsumeTarget(string& err_info, bool is_bound_consume, const string& group_name, const map >& subscribed_topic_and_filter_map, - const string& session_key, int source_count, bool is_select_big, const map& part_offset_map); - - - private: + const string& session_key, int32_t source_count, bool is_select_big, + const map& part_offset_map); + + private: string group_name_; map > sub_topic_and_filter_map_; bool is_bound_consume_; string session_key_; - int source_count_; + uint32_t source_count_; bool is_select_big_; - map part_offset_map_; + map part_offset_map_; ConsumePosition consume_position_; - int max_subinfo_report_intvl_; - int msg_notfound_wait_period_ms_; + int32_t max_subinfo_report_intvl_; + int32_t msg_notfound_wait_period_ms_; bool is_confirm_in_local_; bool is_rollback_if_confirm_timout_; - int reb_confirm_wait_period_ms_; - int max_confirm_wait_period_ms_; - int shutdown_reb_wait_period_ms_; + int32_t reb_confirm_wait_period_ms_; + int32_t max_confirm_wait_period_ms_; + int32_t shutdown_reb_wait_period_ms_; }; -} +} // namespace tubemq -#endif +#endif // TUBEMQ_CLIENT_CONFIGURE_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h b/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h index 4e04ec0..9e72aaa 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/const_config.h @@ -16,61 +16,62 @@ * specific language governing permissions and limitations * under the License. */ - -#ifndef _TUBEMQ_CLIENT_CONST_CONFIG_H_ -#define _TUBEMQ_CLIENT_CONST_CONFIG_H_ +#ifndef TUBEMQ_CLIENT_CONST_CONFIG_H_ +#define TUBEMQ_CLIENT_CONST_CONFIG_H_ + +#include #include #include namespace tubemq { -using namespace std; +using std::string; + // configuration value setting namespace config { // heartbeat period define -static const int kHeartBeatPeriodDef = 10; -static const int kHeartBeatFailRetryTimesDef = 5; -static const int kHeartBeatSleepPeriodDef = 60; +static const int32_t kHeartBeatPeriodDef = 10; +static const int32_t kHeartBeatFailRetryTimesDef = 5; +static const int32_t kHeartBeatSleepPeriodDef = 60; // max masterAddrInfo length -static const int kMasterAddrInfoMaxLength = 1024; +static const int32_t kMasterAddrInfoMaxLength = 1024; // max TopicName length -static const int kTopicNameMaxLength = 64; +static const int32_t kTopicNameMaxLength = 64; // max Consume GroupName length -static const int kGroupNameMaxLength = 1024; +static const int32_t kGroupNameMaxLength = 1024; // max filter item length -static const int kFilterItemMaxLength = 256; +static const int32_t kFilterItemMaxLength = 256; // max allowed filter item count -static const int kFilterItemMaxCount = 500; +static const int32_t kFilterItemMaxCount = 500; // max session key length -static const int kSessionKeyMaxLength = 1024; +static const int32_t kSessionKeyMaxLength = 1024; // max subscribe info report times -static const int kSubInfoReportMaxIntervalTimes = 6; +static const int32_t kSubInfoReportMaxIntervalTimes = 6; // default message not found response wait period -static const int kMsgNotfoundWaitPeriodMsDef = 200; +static const int32_t kMsgNotfoundWaitPeriodMsDef = 200; // default confirm wait period if rebalance meeting -static const int kRebConfirmWaitPeriodMsDef = 3000; +static const int32_t kRebConfirmWaitPeriodMsDef = 3000; // max confirm wait period anyway -static const int kConfirmWaitPeriodMsMax = 60000; +static const int32_t kConfirmWaitPeriodMsMax = 60000; // default rebalance wait if shutdown meeting -static const int kRebWaitPeriodWhenShutdownMs = 10000; +static const int32_t kRebWaitPeriodWhenShutdownMs = 10000; // max int value -static const int kMaxIntValue = 0x7fffffff; +static const int32_t kMaxIntValue = 0x7fffffff; // max long value -static const long kMaxLongValue = 0x7fffffffffffffffL; +static const int64_t kMaxLongValue = 0x7fffffffffffffffL; // default broker port -static const int kBrokerPortDef = 8123; +static const uint32_t kBrokerPortDef = 8123; // default broker TLS port -static const int kBrokerTlsPortDef = 8124; +static const uint32_t kBrokerTlsPortDef = 8124; // invalid value -static const int kInvalidValue = -2; - +static const int32_t kInvalidValue = -2; } // namespace config @@ -84,18 +85,17 @@ namespace delimiter { static const string kDelimiterAt = "@"; static const string kDelimiterPound = "#"; static const string kDelimiterSemicolon = ";"; - //Double slash + // Double slash static const string kDelimiterDbSlash = "//"; // left square bracket static const string kDelimiterLftSB = "["; // right square bracket static const string kDelimiterRgtSB = "]"; - -} // namespace delimiter + +} // namespace delimiter - -} +} // namespace tubemq -#endif +#endif // TUBEMQ_CLIENT_CONST_CONFIG_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/const_rpc.h b/tubemq-client-twins/tubemq-client-cpp/inc/const_rpc.h index cb89eba..2bc0f30 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/const_rpc.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/const_rpc.h @@ -16,68 +16,73 @@ * specific language governing permissions and limitations * under the License. */ - -#ifndef _TUBEMQ_CLIENT_CONST_RPC_H_ -#define _TUBEMQ_CLIENT_CONST_RPC_H_ + +#ifndef TUBEMQ_CLIENT_CONST_RPC_H_ +#define TUBEMQ_CLIENT_CONST_RPC_H_ namespace tubemq { -using namespace std; +#include namespace rpc_config { // constant define - static const int kRpcPrtBeginToken = 0xFF7FF4FE; - static const int kRpcMaxBufferSize = 8192; - static const int kRpcMaxFrameListCnt = (int) ((1024 * 1024 * 8) / kRpcMaxBufferSize); + static const uint32_t kRpcPrtBeginToken = 0xFF7FF4FE; + static const uint32_t kRpcMaxBufferSize = 8192; + static const uint32_t kRpcMaxFrameListCnt = (uint32_t) ((1024 * 1024 * 8) / kRpcMaxBufferSize); + // rpc protocol version - static const int kRpcProtocolVersion = 2; + static const uint32_t kRpcProtocolVersion = 2; + // msg type flag - static const int kRpcFlagMsgRequest = 0x0; - static const int kRpcFlagMsgResponse = 0x1; + static const int32_t kRpcFlagMsgRequest = 0x0; + static const int32_t kRpcFlagMsgResponse = 0x1; + // service type - static const int kMasterService = 1; - static const int kBrokerReadService = 2; - static const int kBrokerWriteService = 3; - static const int kBrokerAdminService = 4; - static const int kMasterAdminService = 5; + static const int32_t kMasterService = 1; + static const int32_t kBrokerReadService = 2; + static const int32_t kBrokerWriteService = 3; + static const int32_t kBrokerAdminService = 4; + static const int32_t kMasterAdminService = 5; + // request method // master rpc method - static const int kMasterMethoddProducerRegister = 1; - static const int kMasterMethoddProducerHeatbeat = 2; - static const int kMasterMethoddProducerClose = 3; - static const int kMasterMethoddConsumerRegister = 4; - static const int kMasterMethoddConsumerHeatbeat = 5; - static const int kMasterMethoddConsumerClose = 6; + static const int32_t kMasterMethoddProducerRegister = 1; + static const int32_t kMasterMethoddProducerHeatbeat = 2; + static const int32_t kMasterMethoddProducerClose = 3; + static const int32_t kMasterMethoddConsumerRegister = 4; + static const int32_t kMasterMethoddConsumerHeatbeat = 5; + static const int32_t kMasterMethoddConsumerClose = 6; + // broker rpc method - static const int kBrokerMethoddProducerRegister = 11; - static const int kBrokerMethoddProducerHeatbeat = 12; - static const int kBrokerMethoddProducerSendMsg = 13; - static const int kBrokerMethoddProducerClose = 14; - static const int kBrokerMethoddConsumerRegister = 15; - static const int kBrokerMethoddConsumerHeatbeat = 16; - static const int kBrokerMethoddConsumerGetMsg = 17; - static const int kBrokerMethoddConsumerCommit = 18; - static const int kBrokerMethoddConsumerClose = 19; + static const int32_t kBrokerMethoddProducerRegister = 11; + static const int32_t kBrokerMethoddProducerHeatbeat = 12; + static const int32_t kBrokerMethoddProducerSendMsg = 13; + static const int32_t kBrokerMethoddProducerClose = 14; + static const int32_t kBrokerMethoddConsumerRegister = 15; + static const int32_t kBrokerMethoddConsumerHeatbeat = 16; + static const int32_t kBrokerMethoddConsumerGetMsg = 17; + static const int32_t kBrokerMethoddConsumerCommit = 18; + static const int32_t kBrokerMethoddConsumerClose = 19; // register operate type - static const int kRegOpTypeRegister = 31; - static const int kRegOpTypeUnReg = 32; + static const int32_t kRegOpTypeRegister = 31; + static const int32_t kRegOpTypeUnReg = 32; // rpc connect node timeout - static const int kRpcConnectTimeoutMs = 3000; - - // rpc timeout define - static const int kRpcTimoutDefSec = 15; - static const int kRpcTimoutMaxSec = 300; - static const int kRpcTimoutMinSec = 8; + static const int32_t kRpcConnectTimeoutMs = 3000; + + // rpc timeout define + static const int32_t kRpcTimoutDefSec = 15; + static const int32_t kRpcTimoutMaxSec = 300; + static const int32_t kRpcTimoutMinSec = 8; -} +} // namespace rpc_config -} +} // namespace tubemq -#endif +#endif // TUBEMQ_CLIENT_CONST_RPC_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/file_ini.h b/tubemq-client-twins/tubemq-client-cpp/inc/file_ini.h index cab7f84..39f9a36 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/file_ini.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/file_ini.h @@ -16,26 +16,30 @@ * specific language governing permissions and limitations * under the License. */ - -#ifndef _TUBEMQ_CLIENT_FILE_INI_H_ -#define _TUBEMQ_CLIENT_FILE_INI_H_ +#ifndef TUBEMQ_CLIENT_FILE_INI_H_ +#define TUBEMQ_CLIENT_FILE_INI_H_ + +#include #include #include + namespace tubemq { -using namespace std; +using std::string; +using std::map; + class Fileini { public: Fileini(); ~Fileini(); bool Loadini(string& err_info, const string& file_name); - bool GetValue(string& err_info, const string& sector, + bool GetValue(string& err_info, const string& sector, const string& key, string& value, const string& def); - bool GetValue(string& err_info, const string& sector, - const string& key, int& value, const int def); + bool GetValue(string& err_info, const string& sector, + const string& key, int32_t& value, const int32_t def); private: bool init_flag_; @@ -43,8 +47,7 @@ class Fileini { map > ini_map_; }; - -} +} // namespace tubemq -#endif +#endif // TUBEMQ_CLIENT_FILE_INI_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/flowctrl_def.h b/tubemq-client-twins/tubemq-client-cpp/inc/flowctrl_def.h index cff7cc6..5c39352 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/flowctrl_def.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/flowctrl_def.h @@ -17,108 +17,113 @@ * under the License. */ -#ifndef _TUBEMQ_CLIENT_FLOW_CONTROL_H_ -#define _TUBEMQ_CLIENT_FLOW_CONTROL_H_ +#ifndef TUBEMQ_CLIENT_FLOW_CONTROL_H_ +#define TUBEMQ_CLIENT_FLOW_CONTROL_H_ -#include +#include +#include #include +#include #include #include -#include -#include #include "atomic_def.h" +#include namespace tubemq { -using namespace std; + +using std::map; +using std::string; +using std::vector; + class FlowCtrlResult { public: FlowCtrlResult(); - FlowCtrlResult(long datasize_limit, int freqms_limit); + FlowCtrlResult(int64_t datasize_limit, int32_t freqms_limit); FlowCtrlResult& operator=(const FlowCtrlResult& target); - void SetDataDltAndFreqLimit(long datasize_limit, int freqms_limit); - void SetDataSizeLimit(long datasize_limit); - void SetFreqMsLimit(int freqms_limit); - long GetDataSizeLimit(); - int GetFreqMsLimit(); + void SetDataDltAndFreqLimit(int64_t datasize_limit, int32_t freqms_limit); + void SetDataSizeLimit(int64_t datasize_limit); + void SetFreqMsLimit(int32_t freqms_limit); + int64_t GetDataSizeLimit(); + int32_t GetFreqMsLimit(); private: - long datasize_limit_; - int freqms_limit_; + int64_t datasize_limit_; + int32_t freqms_limit_; }; class FlowCtrlItem { public: FlowCtrlItem(); - FlowCtrlItem(int type,int zero_cnt,int freqms_limit); - FlowCtrlItem(int type, - int datasize_limit,int freqms_limit,int min_data_filter_freqms); - FlowCtrlItem(int type, int start_time, - int end_time, long datadlt_m, long datasize_limit, int freqms_limit); + FlowCtrlItem(int32_t type, int32_t zero_cnt, int32_t freqms_limit); + FlowCtrlItem(int32_t type, int32_t datasize_limit, + int32_t freqms_limit, int32_t min_data_filter_freqms); + FlowCtrlItem(int32_t type, int32_t start_time, int32_t end_time, + int64_t datadlt_m, int64_t datasize_limit, int32_t freqms_limit); FlowCtrlItem& operator=(const FlowCtrlItem& target); void Clear(); - void ResetFlowCtrlValue(int type, - int datasize_limit,int freqms_limit,int min_data_filter_freqms); - int GetFreLimit(int msg_zero_cnt); - bool GetDataLimit(long datadlt_m, int curr_time, FlowCtrlResult& flowctrl_result); - const int GetType() const { + void ResetFlowCtrlValue(int32_t type, + int32_t datasize_limit, int32_t freqms_limit, int32_t min_data_filter_freqms); + int32_t GetFreLimit(int32_t msg_zero_cnt); + bool GetDataLimit(int64_t datadlt_m, int32_t curr_time, FlowCtrlResult& flowctrl_result); + const int32_t GetType() const { return type_; } - const int GetZeroCnt() const { + const int32_t GetZeroCnt() const { return zero_cnt_; } - const int GetStartTime() const { + const int32_t GetStartTime() const { return start_time_; } - const int GetEndTime() const { + const int32_t GetEndTime() const { return end_time_; } - const long GetDataSizeLimit() const { + const int64_t GetDataSizeLimit() const { return datasize_limit_; } - const int GetFreqMsLimit() const { + const int32_t GetFreqMsLimit() const { return freqms_limit_; } - const long GetDltInM() const { + const int64_t GetDltInM() const { return datadlt_m_; } private: - int type_; - int start_time_; - int end_time_; - long datadlt_m_; - long datasize_limit_; - int freqms_limit_; - int zero_cnt_; + int32_t type_; + int32_t start_time_; + int32_t end_time_; + int64_t datadlt_m_; + int64_t datasize_limit_; + int32_t freqms_limit_; + int32_t zero_cnt_; }; class FlowCtrlRuleHandler { public: FlowCtrlRuleHandler(); ~FlowCtrlRuleHandler(); - void UpdateDefFlowCtrlInfo(bool is_default, - int qrypriority_id, long flowctrl_id, const string& flowctrl_info); - bool GetCurDataLimit(long last_datadlt,FlowCtrlResult& flowctrl_result); - int GetCurFreqLimitTime(int msg_zero_cnt, int received_limit); - int GetMinZeroCnt() { return this->min_zero_cnt_.Get();} - int GetQryPriorityId() { + void UpdateDefFlowCtrlInfo(bool is_default, + int32_t qrypriority_id, int64_t flowctrl_id, const string& flowctrl_info); + bool GetCurDataLimit(int32_t last_datadlt, FlowCtrlResult& flowctrl_result); + int32_t GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit); + int32_t GetMinZeroCnt() { return this->min_zero_cnt_.Get();} + int32_t GetQryPriorityId() { return this->qrypriority_id_.Get(); } - void SetQryPriorityId(int qrypriority_id) { + void SetQryPriorityId(int32_t qrypriority_id) { this->qrypriority_id_.Set(qrypriority_id); } - long GetFlowCtrlId() { + int64_t GetFlowCtrlId() { return this->flowctrl_id_.Get(); } const FlowCtrlItem& GetFilterCtrlItem() const { return this->filter_ctrl_item_; } - const string& GetFlowCtrlInfo() const { + const string& GetFlowCtrlInfo() const { return this->flowctrl_info_; } @@ -127,17 +132,22 @@ class FlowCtrlRuleHandler { void clearStatisData(); static bool compareFeqQueue(const FlowCtrlItem& queue1, const FlowCtrlItem& queue2); static bool compareDataLimitQueue(const FlowCtrlItem& o1, const FlowCtrlItem& o2); - bool parseStringMember(string &err_info, const rapidjson::Value& root, + bool parseStringMember(string &err_info, const rapidjson::Value& root, const char* key, string& value, bool compare_value, string required_val); - bool parseLongMember(string &err_info, const rapidjson::Value& root, - const char* key, long& value, bool compare_value, long required_val); - bool parseIntMember(string &err_info, const rapidjson::Value& root, - const char* key, int& value, bool compare_value, int required_val); - bool parseFlowCtrlInfo(const string& flowctrl_info, map >& flowctrl_info_map); - bool parseDataLimit(string& err_info, const rapidjson::Value& root, vector& flowCtrlItems); - bool parseFreqLimit(string& err_info, const rapidjson::Value& root, vector& flowctrl_items); - bool parseLowFetchLimit(string& err_info, const rapidjson::Value& root, vector& flowctrl_items); - bool parseTimeMember(string& err_info, const rapidjson::Value& root, const char* key, int& value); + bool parseLongMember(string &err_info, const rapidjson::Value& root, + const char* key, int64_t& value, bool compare_value, int64_t required_val); + bool parseIntMember(string &err_info, const rapidjson::Value& root, + const char* key, int32_t& value, bool compare_value, int32_t required_val); + bool parseFlowCtrlInfo(const string& flowctrl_info, + map >& flowctrl_info_map); + bool parseDataLimit(string& err_info, + const rapidjson::Value& root, vector& flowCtrlItems); + bool parseFreqLimit(string& err_info, + const rapidjson::Value& root, vector& flowctrl_items); + bool parseLowFetchLimit(string& err_info, + const rapidjson::Value& root, vector& flowctrl_items); + bool parseTimeMember(string& err_info, + const rapidjson::Value& root, const char* key, int32_t& value); private: AtomicLong flowctrl_id_; @@ -148,15 +158,13 @@ class FlowCtrlRuleHandler { AtomicInteger datalimit_start_time_; AtomicInteger datalimit_end_time_; FlowCtrlItem filter_ctrl_item_; - map > flowctrl_rules_; + map > flowctrl_rules_; pthread_rwlock_t configrw_lock_; - long last_update_time_; + int64_t last_update_time_; }; - - -} +} // namespace tubemq -#endif +#endif // TUBEMQ_CLIENT_FLOW_CONTROL_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/logger.h b/tubemq-client-twins/tubemq-client-cpp/inc/logger.h index 8c5f9b3..c2f2e7d 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/logger.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/logger.h @@ -52,8 +52,7 @@ Logger& GetLogger(); class Logger { public: - enum Level - { + enum Level { kTrace = 0, kDebug = 1, kInfo = 2, diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/message.h b/tubemq-client-twins/tubemq-client-cpp/inc/message.h index 20d7fcf..5c8be2c 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/message.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/message.h @@ -16,63 +16,63 @@ * specific language governing permissions and limitations * under the License. */ - -#ifndef _TUBEMQ_CLIENT_MESSAGE_H_ -#define _TUBEMQ_CLIENT_MESSAGE_H_ +#ifndef TUBEMQ_CLIENT_MESSAGE_H_ +#define TUBEMQ_CLIENT_MESSAGE_H_ +#include +#include #include #include #include -#include namespace tubemq { -using namespace std; +using std::map; +using std::string; + + class Message { public: Message(); Message(const Message& target); - Message(const string& topic, const char* data, int datalen); + Message(const string& topic, const char* data, uint32_t datalen); virtual ~Message(); Message& operator=(const Message& target); - const long GetMessageId() const; - void SetMessageId(long message_id); + const int64_t GetMessageId() const; + void SetMessageId(int64_t message_id); const string& GetTopic() const; void SetTopic(const string& topic); const char* GetData() const; - int GetDataLength() const; + uint32_t GetDataLength() const; void setData(const char* data, int datalen); - const int GetFlag() const; - void SetFlag(int flag); + const int32_t GetFlag() const; + void SetFlag(int32_t flag); const map& GetProperties() const; - int GetProperties(string& attribute); + int32_t GetProperties(string& attribute); bool HasProperty(const string& key); bool GetProperty(const string& key, string& value); bool GetFilterItem(string& value); - bool AddProperty(string& err_info, const string& key, const string& value); + bool AddProperty(string& err_info, const string& key, const string& value); private: void clearData(); - void copyData(const char* data, int datalen); + void copyData(const char* data, uint32_t datalen); void copyProperties(const map& properties); - private: string topic_; char* data_; - int datalen_; - long message_id_; - int flag_; + uint32_t datalen_; + int64_t message_id_; + int32_t flag_; map properties_; }; -} - - +} // namespace tubemq -#endif +#endif // TUBEMQ_CLIENT_MESSAGE_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/inc/meta_info.h index 6264657..dcc34b0 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/meta_info.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/meta_info.h @@ -16,46 +16,50 @@ * specific language governing permissions and limitations * under the License. */ - -#ifndef _TUBEMQ_CLIENT_META_INFO_H_ -#define _TUBEMQ_CLIENT_META_INFO_H_ +#ifndef TUBEMQ_CLIENT_META_INFO_H_ +#define TUBEMQ_CLIENT_META_INFO_H_ + +#include #include #include +#include "flowctrl_def.h" -namespace tubemq { -using namespace std; +namespace tubemq { +using std::list; +using std::map; +using std::string; class NodeInfo { public: NodeInfo(); NodeInfo(bool is_broker, const string& node_info); - NodeInfo(const string& node_host, int node_port); - NodeInfo(int node_id, const string& node_host, int node_port); + NodeInfo(const string& node_host, uint32_t node_port); + NodeInfo(int32_t node_id, const string& node_host, uint32_t node_port); ~NodeInfo(); NodeInfo& operator=(const NodeInfo& target); bool operator== (const NodeInfo& target); bool operator< (const NodeInfo& target) const; - const int GetNodeId() const; + const uint32_t GetNodeId() const; const string& GetHost() const; - const int GetPort() const; + const uint32_t GetPort() const; const string& GetAddrInfo() const; const string& GetNodeInfo() const; - + private: void buildStrInfo(); - private: - int node_id_; - string node_host_; - int node_port_; + private: + uint32_t node_id_; + string node_host_; + uint32_t node_port_; // ip:port - string addr_info_; + string addr_info_; // id:ip:port - string node_info_; + string node_info_; }; @@ -64,17 +68,17 @@ class Partition { Partition(); Partition(const string& partition_info); Partition(const NodeInfo& broker_info, const string& part_str); - Partition(const NodeInfo& broker_info, const string& topic, int partition_id); + Partition(const NodeInfo& broker_info, const string& topic, uint32_t partition_id); ~Partition(); Partition& operator=(const Partition& target); bool operator== (const Partition& target); - const int GetBrokerId() const; + const uint32_t GetBrokerId() const; const string& GetBrokerHost() const; - const int GetBrokerPort() const; + const uint32_t GetBrokerPort() const; const string& GetPartitionKey() const; const string& GetTopic() const; const NodeInfo& GetBrokerInfo() const; - const int GetPartitionId() const; + const uint32_t GetPartitionId() const; const string& ToString() const; private: @@ -83,7 +87,7 @@ class Partition { private: string topic_; NodeInfo broker_info_; - int partition_id_; + uint32_t partition_id_; string partition_key_; string partition_info_; }; @@ -97,11 +101,11 @@ class SubscribeInfo { const string& GetConsumerId() const; const string& GetGroup() const; const Partition& GetPartition() const; - const int GgetBrokerId() const; + const uint32_t GgetBrokerId() const; const string& GetBrokerHost() const; - const int GetBrokerPort() const; + const uint32_t GetBrokerPort() const; const string& GetTopic() const; - const int GetPartitionId() const; + const uint32_t GetPartitionId() const; const string& ToString() const; private: @@ -119,36 +123,31 @@ class ConsumerEvent { public: ConsumerEvent(); ConsumerEvent(const ConsumerEvent& target); - ConsumerEvent(long rebalance_id,int event_type, - const list& subscribeInfo_lst, int event_status); + ConsumerEvent(int64_t rebalance_id, int32_t event_type, + const list& subscribeInfo_lst, int32_t event_status); ConsumerEvent& operator=(const ConsumerEvent& target); - const long GetRebalanceId() const; - const int GetEventType() const; - const int GetEventStatus() const; - void SetEventType(int event_type); - void SetEventStatus(int event_status); + const int64_t GetRebalanceId() const; + const int32_t GetEventType() const; + const int32_t GetEventStatus() const; + void SetEventType(int32_t event_type); + void SetEventStatus(int32_t event_status); const list& GetSubscribeInfoList() const; string ToString(); private: - long rebalance_id_; - int event_type_; - int event_status_; + int64_t rebalance_id_; + int32_t event_type_; + int32_t event_status_; list subscribe_list_; }; - class PartitionExt : public Partition { PartitionExt(); }; +} // namespace tubemq - - - -} - -#endif +#endif // TUBEMQ_CLIENT_META_INFO_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/singleton.h b/tubemq-client-twins/tubemq-client-cpp/inc/singleton.h index af3b0d4..734f4fb 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/singleton.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/singleton.h @@ -20,8 +20,8 @@ #ifndef _TUBEMQ_SINGLETON_H #define _TUBEMQ_SINGLETON_H -#include #include +#include #include #include diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/utils.h b/tubemq-client-twins/tubemq-client-cpp/inc/utils.h index efb22d7..707c1fd 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/utils.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/utils.h @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ - -#ifndef _TUBEMQ_CLIENT_UTILS_H_ -#define _TUBEMQ_CLIENT_UTILS_H_ +#ifndef TUBEMQ_CLIENT_UTILS_H_ +#define TUBEMQ_CLIENT_UTILS_H_ + +#include #include #include #include @@ -27,8 +28,9 @@ namespace tubemq { -using namespace std; - +using std::map; +using std::string; +using std::vector; class Utils { @@ -38,23 +40,22 @@ class Utils { // split string to vector static void Split(const string& source, vector& result, const string& delimiter); // split string to map - static void Split(const string& source, map& result, + static void Split(const string& source, map& result, const string& delimiter_step1, const string& delimiter_step2); static void Join(const vector& vec, const string& delimiter, string& target); static bool ValidString(string& err_info, const string& source, bool allow_empty, bool pat_match, bool check_max_length, unsigned int maxlen); - static bool ValidGroupName(string &err_info, + static bool ValidGroupName(string &err_info, const string& group_name, string& tgt_group_name); - static bool ValidFilterItem(string& err_info, + static bool ValidFilterItem(string& err_info, const string& src_filteritem, string& tgt_filteritem); - static string Int2str(int data); - static string Long2str(long data); - static int IpToInt(const string& ipv4_addr); - static long GetCurrentTimeMillis(); - + static string Int2str(int32_t data); + static string Long2str(int64_t data); + static uint32_t IpToInt(const string& ipv4_addr); + static int64_t GetCurrentTimeMillis(); }; - -} -#endif +} // namespace tubemq + +#endif // TUBEMQ_CLIENT_UTILS_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/inc/version.h b/tubemq-client-twins/tubemq-client-cpp/inc/version.h index e7ccc62..848a5c8 100644 --- a/tubemq-client-twins/tubemq-client-cpp/inc/version.h +++ b/tubemq-client-twins/tubemq-client-cpp/inc/version.h @@ -16,19 +16,19 @@ * specific language governing permissions and limitations * under the License. */ - -#ifndef _TUBEMQ_CLIENT_VERSION_H_ -#define _TUBEMQ_CLIENT_VERSION_H_ + +#ifndef TUBEMQ_CLIENT_VERSION_H_ +#define TUBEMQ_CLIENT_VERSION_H_ #include namespace tubemq { -using namespace std; +using std::string; -static const String kTubeMQClientVersion = "0.1.0-0.5.0"; +static const string kTubeMQClientVersion = "0.1.0-0.5.0"; -} +} // namespace tubemq -#endif +#endif // TUBEMQ_CLIENT_VERSION_H_ diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc index d3a03ba..3c88eb2 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/client_config.cc @@ -17,12 +17,11 @@ * under the License. */ -#include -#include #include "client_config.h" +#include +#include #include "const_config.h" #include "const_rpc.h" - #include "utils.h" @@ -44,7 +43,7 @@ BaseConfig::BaseConfig() { } BaseConfig::~BaseConfig() { - + // } BaseConfig& BaseConfig::operator=(const BaseConfig& target) { @@ -75,14 +74,14 @@ bool BaseConfig::SetMasterAddrInfo(string& err_info, const string& master_addrin stringstream ss; ss << "Illegal parameter: over max "; ss << config::kMasterAddrInfoMaxLength; - ss << " length of master_addrinfo parameter!"; + ss << " length of master_addrinfo parameter!"; err_info = ss.str(); return false; } // parse and verify master address info // master_addrinfo's format like ip1:port1,ip2:port2,ip3:port3 map tgt_address_map; - Utils::Split(master_addrinfo, tgt_address_map, + Utils::Split(master_addrinfo, tgt_address_map, delimiter::kDelimiterComma, delimiter::kDelimiterColon); if (tgt_address_map.empty()) { err_info = "Illegal parameter: master_addrinfo is blank!"; @@ -97,27 +96,27 @@ bool BaseConfig::SetTlsInfo(string& err_info, bool tls_enable, const string& trust_store_path, const string& trust_store_password) { this->tls_enabled_ = tls_enable; if (tls_enable) { - string trimed_trust_store_path = Utils::Trim(trust_store_path); + string trimed_trust_store_path = Utils::Trim(trust_store_path); if (trimed_trust_store_path.empty()) { err_info = "Illegal parameter: trust_store_path is empty!"; return false; } - string trimed_trust_store_password = Utils::Trim(trust_store_password); + string trimed_trust_store_password = Utils::Trim(trust_store_password); if (trimed_trust_store_password.empty()) { err_info = "Illegal parameter: trust_store_password is empty!"; return false; } - this->tls_trust_store_path_= trimed_trust_store_path; - this->tls_trust_store_password_= trimed_trust_store_password; + this->tls_trust_store_path_ = trimed_trust_store_path; + this->tls_trust_store_password_ = trimed_trust_store_password; } else { this->tls_trust_store_path_ = ""; this->tls_trust_store_password_ = ""; } err_info = "Ok"; - return true; + return true; } -bool BaseConfig::SetAuthenticInfo(string& err_info, bool authentic_enable, +bool BaseConfig::SetAuthenticInfo(string& err_info, bool authentic_enable, const string& usr_name, const string& usr_password) { this->auth_enable_ = authentic_enable; if (authentic_enable) { @@ -126,7 +125,7 @@ bool BaseConfig::SetAuthenticInfo(string& err_info, bool authentic_enable, err_info = "Illegal parameter: usr_name is empty!"; return false; } - string trimed_usr_password = Utils::Trim(usr_password); + string trimed_usr_password = Utils::Trim(usr_password); if (trimed_usr_password.empty()) { err_info = "Illegal parameter: usr_password is empty!"; return false; @@ -179,31 +178,31 @@ void BaseConfig::SetRpcReadTimeoutSec(int rpc_read_timeout_sec) { } } -int BaseConfig::GetRpcReadTimeoutSec() { +int32_t BaseConfig::GetRpcReadTimeoutSec() { return this->rpc_read_timeout_sec_; } -void BaseConfig::SetHeartbeatPeriodSec(int heartbeat_period_sec) { +void BaseConfig::SetHeartbeatPeriodSec(int32_t heartbeat_period_sec) { this->heartbeat_period_sec_ = heartbeat_period_sec; } -int BaseConfig::GetHeartbeatPeriodSec() { +int32_t BaseConfig::GetHeartbeatPeriodSec() { return this->heartbeat_period_sec_; } -void BaseConfig::SetMaxHeartBeatRetryTimes(int max_heartbeat_retry_times) { +void BaseConfig::SetMaxHeartBeatRetryTimes(int32_t max_heartbeat_retry_times) { this->max_heartbeat_retry_times_ = max_heartbeat_retry_times; } -int BaseConfig::GetMaxHeartBeatRetryTimes() { +int32_t BaseConfig::GetMaxHeartBeatRetryTimes() { return this->max_heartbeat_retry_times_; } -void BaseConfig::SetHeartbeatPeriodAftFailSec(int heartbeat_period_afterfail_sec) { +void BaseConfig::SetHeartbeatPeriodAftFailSec(int32_t heartbeat_period_afterfail_sec) { this->heartbeat_period_afterfail_sec_ = heartbeat_period_afterfail_sec; } -int BaseConfig::GetHeartbeatPeriodAftFailSec() { +int32_t BaseConfig::GetHeartbeatPeriodAftFailSec() { return this->heartbeat_period_afterfail_sec_; } @@ -235,30 +234,29 @@ string BaseConfig::ToString() { return ss.str(); } +ConsumerConfig::ConsumerConfig() { + this->group_name_ = ""; + this->is_bound_consume_ = false; + this->session_key_ = ""; + this->source_count_ = 0; + this->is_select_big_ = true; + this->consume_position_ = kConsumeFromLatestOffset; + this->is_confirm_in_local_ = false; + this->is_rollback_if_confirm_timout_ = true; + this->max_subinfo_report_intvl_ = config::kSubInfoReportMaxIntervalTimes; + this->msg_notfound_wait_period_ms_ = config::kMsgNotfoundWaitPeriodMsDef; + this->reb_confirm_wait_period_ms_ = config::kRebConfirmWaitPeriodMsDef; + this->max_confirm_wait_period_ms_ = config::kConfirmWaitPeriodMsMax; + this->shutdown_reb_wait_period_ms_ = config::kRebWaitPeriodWhenShutdownMs; +} - ConsumerConfig::ConsumerConfig() { - this->group_name_ = ""; - this->is_bound_consume_ = false; - this->session_key_ = ""; - this->source_count_ = -1; - this->is_select_big_ = true; - this->consume_position_ = kConsumeFromLatestOffset; - this->is_confirm_in_local_ = false; - this->is_rollback_if_confirm_timout_ = true; - this->max_subinfo_report_intvl_ = config::kSubInfoReportMaxIntervalTimes; - this->msg_notfound_wait_period_ms_ = config::kMsgNotfoundWaitPeriodMsDef; - this->reb_confirm_wait_period_ms_ = config::kRebConfirmWaitPeriodMsDef; - this->max_confirm_wait_period_ms_ = config::kConfirmWaitPeriodMsMax; - this->shutdown_reb_wait_period_ms_ = config::kRebWaitPeriodWhenShutdownMs; - } - - ConsumerConfig::~ConsumerConfig() { - - } +ConsumerConfig::~ConsumerConfig() { + // +} ConsumerConfig& ConsumerConfig::operator=(const ConsumerConfig& target) { if (this != &target) { - // parent class + // parent class BaseConfig::operator =(target); // child class this->group_name_ = target.group_name_; @@ -281,9 +279,9 @@ ConsumerConfig& ConsumerConfig::operator=(const ConsumerConfig& target) { } bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, - const string& group_name, const set& subscribed_topicset) { + const string& group_name, const set& subscribed_topicset) { string tgt_group_name; - bool is_success = Utils::ValidGroupName(err_info,group_name, tgt_group_name); + bool is_success = Utils::ValidGroupName(err_info, group_name, tgt_group_name); if (!is_success) { return false; } @@ -293,17 +291,17 @@ bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, } string topic_name; map > tmp_sub_map; - for (set::iterator it = subscribed_topicset.begin(); + for (set::iterator it = subscribed_topicset.begin(); it != subscribed_topicset.end(); ++it) { topic_name = Utils::Trim(*it); - is_success = Utils::ValidString(err_info, topic_name, + is_success = Utils::ValidString(err_info, topic_name, false, true, true, config::kTopicNameMaxLength); if (!is_success) { err_info = "Illegal parameter: subscribed_topicset's item error, " + err_info; return false; } set tmp_filters; - tmp_sub_map[topic_name] = tmp_filters; + tmp_sub_map[topic_name] = tmp_filters; } this->is_bound_consume_ = false; this->group_name_ = tgt_group_name; @@ -312,31 +310,33 @@ bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, return true; } -bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, +bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, const string& group_name, const map >& subscribed_topic_and_filter_map) { string session_key; int source_count = 0; bool is_select_big = false; - map part_offset_map; - return setGroupConsumeTarget(err_info, false, - group_name, subscribed_topic_and_filter_map, - session_key, source_count, is_select_big, part_offset_map); + map part_offset_map; + return setGroupConsumeTarget(err_info, false, + group_name, subscribed_topic_and_filter_map, + session_key, source_count, is_select_big, part_offset_map); } -bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, +bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, const string& group_name, const map >& subscribed_topic_and_filter_map, - const string& session_key, int source_count, bool is_select_big, const map& part_offset_map) { - return setGroupConsumeTarget(err_info, true, - group_name, subscribed_topic_and_filter_map, - session_key, source_count, is_select_big, part_offset_map); + const string& session_key, uint32_t source_count, bool is_select_big, + const map& part_offset_map) { + return setGroupConsumeTarget(err_info, true, + group_name, subscribed_topic_and_filter_map, + session_key, source_count, is_select_big, part_offset_map); } bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consume, const string& group_name, const map >& subscribed_topic_and_filter_map, - const string& session_key, int source_count, bool is_select_big, const map& part_offset_map) { + const string& session_key, uint32_t source_count, bool is_select_big, + const map& part_offset_map) { // check parameter group_name string tgt_group_name; - bool is_success = Utils::ValidGroupName(err_info,group_name, tgt_group_name); + bool is_success = Utils::ValidGroupName(err_info, group_name, tgt_group_name); if (!is_success) { return false; } @@ -347,14 +347,14 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu } map > tmp_sub_map; map >::const_iterator it_map; - for (it_map = subscribed_topic_and_filter_map.begin(); - it_map != subscribed_topic_and_filter_map.end(); ++it_map) { - int count=0; + for (it_map = subscribed_topic_and_filter_map.begin(); + it_map != subscribed_topic_and_filter_map.end(); ++it_map) { + uint32_t count = 0; string tmp_filteritem; set tgt_filters; // check topic_name info - is_success = Utils::ValidString(err_info, it_map->first, - false, true, true, config::kTopicNameMaxLength); + is_success = Utils::ValidString(err_info, it_map->first, + false, true, true, config::kTopicNameMaxLength); if (!is_success) { stringstream ss; ss << "Check parameter subscribed_topic_and_filter_map error: topic "; @@ -367,7 +367,8 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu string topic_name = Utils::Trim(it_map->first); // check filter info set subscribed_filters = it_map->second; - for (set::iterator it = subscribed_filters.begin(); it != subscribed_filters.end(); ++it) { + for (set::iterator it = subscribed_filters.begin(); + it != subscribed_filters.end(); ++it) { is_success = Utils::ValidFilterItem(err_info, *it, tmp_filteritem); if (!is_success) { stringstream ss; @@ -392,7 +393,7 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu } tmp_sub_map[topic_name] = tgt_filters; } - // check if bound consume + // check if bound consume if (!is_bound_consume) { this->is_bound_consume_ = false; this->group_name_ = tgt_group_name; @@ -402,7 +403,7 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu } // check session_key string tgt_session_key = Utils::Trim(session_key); - if (tgt_session_key.length() == 0 + if (tgt_session_key.length() == 0 || tgt_session_key.length() > config::kSessionKeyMaxLength) { if (tgt_session_key.length() == 0) { err_info = "Illegal parameter: session_key is empty!"; @@ -421,8 +422,8 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu } // check part_offset_map string part_key; - map tmp_parts_map; - map::const_iterator it_part; + map tmp_parts_map; + map::const_iterator it_part; for (it_part = part_offset_map.begin(); it_part != part_offset_map.end(); ++it_part) { vector result; Utils::Split(it_part->first, result, delimiter::kDelimiterColon); @@ -469,7 +470,7 @@ bool ConsumerConfig::setGroupConsumeTarget(string& err_info, bool is_bound_consu // set verified data this->is_bound_consume_ = true; this->group_name_ = tgt_group_name; - this->sub_topic_and_filter_map_ = tmp_sub_map; + this->sub_topic_and_filter_map_ = tmp_sub_map; this->session_key_ = tgt_session_key; this->source_count_ = source_count; this->is_select_big_ = is_select_big; @@ -485,7 +486,7 @@ const string& ConsumerConfig::GetGroupName() const { const map >& ConsumerConfig::GetSubTopicAndFilterMap() const { return this->sub_topic_and_filter_map_; } - + void ConsumerConfig::SetConsumePosition(ConsumePosition consume_from_where) { this->consume_position_ = consume_from_where; } @@ -551,9 +552,9 @@ void ConsumerConfig::SetShutdownRebWaitPeriodMs(int wait_period_when_shutdown_ms } string ConsumerConfig::ToString() { - int i = 0; + int32_t i = 0; stringstream ss; - map::iterator it; + map::iterator it; map >::iterator it_map; // print info @@ -562,15 +563,15 @@ string ConsumerConfig::ToString() { ss << ", group_name_='"; ss << this->group_name_; ss << "', sub_topic_and_filter_map_={"; - for (it_map = this->sub_topic_and_filter_map_.begin(); - it_map != this->sub_topic_and_filter_map_.end(); ++it_map) { + for (it_map = this->sub_topic_and_filter_map_.begin(); + it_map != this->sub_topic_and_filter_map_.end(); ++it_map) { if (i++ > 0) { ss << ","; } ss << "'"; ss << it_map->first; ss << "'=["; - int j=0; + int32_t j = 0; set topic_set = it_map->second; for (set::iterator it = topic_set.begin(); it != topic_set.end(); ++it) { if (j++ > 0) { @@ -585,15 +586,15 @@ string ConsumerConfig::ToString() { ss << "}, is_bound_consume_="; ss << this->is_bound_consume_; ss << ", session_key_='"; - ss << this->session_key_; + ss << this->session_key_; ss << "', source_count_="; - ss << this->source_count_; + ss << this->source_count_; ss << ", is_select_big_="; - ss << this->is_select_big_; - ss << ", part_offset_map_={"; + ss << this->is_select_big_; + ss << ", part_offset_map_={"; i = 0; - for (it = this->part_offset_map_.begin(); - it != this->part_offset_map_.end(); ++it) { + for (it = this->part_offset_map_.begin(); + it != this->part_offset_map_.end(); ++it) { if (i++ > 0) { ss << ","; } @@ -622,5 +623,5 @@ string ConsumerConfig::ToString() { return ss.str(); } -} +} // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc index df0cdf3..6400359 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc @@ -17,12 +17,13 @@ * under the License. */ -#include -#include -#include -#include "utils.h" #include "file_ini.h" +#include +#include +#include #include "const_config.h" +#include "utils.h" + namespace tubemq { @@ -60,8 +61,8 @@ bool Fileini::Loadini(string& err_info, const string& file_name) { while (getline(conf_file, line_str)) { // check if a comment line_str = Utils::Trim(line_str); - if (line_str.empty() - || line_str.find(delimiter::kDelimiterDbSlash) == 0 + if (line_str.empty() + || line_str.find(delimiter::kDelimiterDbSlash) == 0 || line_str.find(delimiter::kDelimiterSemicolon) == 0) { continue; } @@ -69,8 +70,8 @@ bool Fileini::Loadini(string& err_info, const string& file_name) { lftsb_pos = line_str.find(delimiter::kDelimiterLftSB); rgtsb_pos = line_str.find(delimiter::kDelimiterRgtSB); if (lftsb_pos != string::npos && rgtsb_pos != string::npos) { - sector = line_str.substr(lftsb_pos + (delimiter::kDelimiterLftSB).size(), - rgtsb_pos - (delimiter::kDelimiterRgtSB).size()); + sector = line_str.substr(lftsb_pos + (delimiter::kDelimiterLftSB).size(), + rgtsb_pos - (delimiter::kDelimiterRgtSB).size()); sector = Utils::Trim(sector); continue; } @@ -107,7 +108,7 @@ bool Fileini::Loadini(string& err_info, const string& file_name) { return true; } -bool Fileini::GetValue(string& err_info, const string& sector, +bool Fileini::GetValue(string& err_info, const string& sector, const string& key, string& value, const string& def) { if (!this->init_flag_) { err_info = "Please load configure file first!"; @@ -132,8 +133,8 @@ bool Fileini::GetValue(string& err_info, const string& sector, return true; } -bool Fileini::GetValue(string& err_info, const string& sector, - const string& key, int& value, const int def) { +bool Fileini::GetValue(string& err_info, const string& sector, + const string& key, int32_t& value, const int32_t def) { string val_str; string def_str = Utils::Int2str(def); bool result = GetValue(err_info, sector, key, val_str, def_str); @@ -144,6 +145,6 @@ bool Fileini::GetValue(string& err_info, const string& sector, return true; } -} +} // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc index bdc8138..e59b606 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc @@ -17,14 +17,14 @@ * under the License. */ - #include +#include "flowctrl_def.h" +#include #include #include #include -#include "utils.h" -#include "logger.h" #include "const_config.h" -#include "flowctrl_def.h" +#include "logger.h" +#include "utils.h" @@ -32,41 +32,41 @@ namespace tubemq { FlowCtrlResult::FlowCtrlResult() { - this->datasize_limit_ = config::kMaxIntValue; + this->datasize_limit_ = config::kMaxIntValue; this->freqms_limit_ = 0; } -FlowCtrlResult::FlowCtrlResult(long datasize_limit, int freqms_limit) { - this->datasize_limit_ = datasize_limit; +FlowCtrlResult::FlowCtrlResult(int64_t datasize_limit, int32_t freqms_limit) { + this->datasize_limit_ = datasize_limit; this->freqms_limit_ = freqms_limit; } FlowCtrlResult& FlowCtrlResult::operator=(const FlowCtrlResult& target) { - if (this == &target) + if (this == &target) return *this; - this->datasize_limit_ = target.datasize_limit_; + this->datasize_limit_ = target.datasize_limit_; this->freqms_limit_ = target.freqms_limit_; return *this; } -void FlowCtrlResult::SetDataDltAndFreqLimit(long datasize_limit, int freqms_limit) { +void FlowCtrlResult::SetDataDltAndFreqLimit(int64_t datasize_limit, int32_t freqms_limit) { this->datasize_limit_ = datasize_limit; this->freqms_limit_ = freqms_limit; } -void FlowCtrlResult::SetDataSizeLimit(long datasize_limit) { +void FlowCtrlResult::SetDataSizeLimit(int64_t datasize_limit) { this->datasize_limit_ = datasize_limit; } -void FlowCtrlResult::SetFreqMsLimit(int freqms_limit) { +void FlowCtrlResult::SetFreqMsLimit(int32_t freqms_limit) { this->freqms_limit_ = freqms_limit; } -long FlowCtrlResult::GetDataSizeLimit() { +int64_t FlowCtrlResult::GetDataSizeLimit() { return this->datasize_limit_; } -int FlowCtrlResult::GetFreqMsLimit() { +int32_t FlowCtrlResult::GetFreqMsLimit() { return this->freqms_limit_; } @@ -81,7 +81,8 @@ FlowCtrlItem::FlowCtrlItem() { this->zero_cnt_ = config::kInvalidValue; } -FlowCtrlItem::FlowCtrlItem(int type,int zero_cnt,int freqms_limit) { +FlowCtrlItem::FlowCtrlItem(int32_t type, + int32_t zero_cnt, int32_t freqms_limit) { this->type_ = type; this->start_time_ = 2500; this->end_time_ = config::kInvalidValue; @@ -91,8 +92,8 @@ FlowCtrlItem::FlowCtrlItem(int type,int zero_cnt,int freqms_limit) { this->zero_cnt_ = zero_cnt; } -FlowCtrlItem::FlowCtrlItem(int type, - int datasize_limit,int freqms_limit,int min_data_filter_freqms) { +FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t datasize_limit, + int32_t freqms_limit, int32_t min_data_filter_freqms) { this->type_ = type; this->start_time_ = 2500; this->end_time_ = config::kInvalidValue; @@ -102,8 +103,8 @@ FlowCtrlItem::FlowCtrlItem(int type, this->zero_cnt_ = min_data_filter_freqms; } -FlowCtrlItem::FlowCtrlItem(int type, int start_time, int end_time, - long datadlt_m, long datasize_limit, int freqms_limit) { +FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t start_time, int32_t end_time, + int64_t datadlt_m, int64_t datasize_limit, int32_t freqms_limit) { this->type_ = type; this->start_time_ = start_time; this->end_time_ = end_time; @@ -114,7 +115,7 @@ FlowCtrlItem::FlowCtrlItem(int type, int start_time, int end_time, } FlowCtrlItem& FlowCtrlItem::operator=(const FlowCtrlItem& target) { - if (this == &target) + if (this == &target) return *this; this->type_ = target.type_; this->start_time_ = target.start_time_; @@ -126,7 +127,7 @@ FlowCtrlItem& FlowCtrlItem::operator=(const FlowCtrlItem& target) { return *this; } -int FlowCtrlItem::GetFreLimit(int msg_zero_cnt) { +int32_t FlowCtrlItem::GetFreLimit(int32_t msg_zero_cnt) { if (this->type_ != 1) { return -1; } @@ -136,8 +137,8 @@ int FlowCtrlItem::GetFreLimit(int msg_zero_cnt) { return -1; } -void FlowCtrlItem::ResetFlowCtrlValue(int type, - int datasize_limit,int freqms_limit,int min_data_filter_freqms) { +void FlowCtrlItem::ResetFlowCtrlValue(int32_t type, int32_t datasize_limit, + int32_t freqms_limit, int32_t min_data_filter_freqms) { this->type_ = type; this->start_time_ = 2500; this->end_time_ = config::kInvalidValue; @@ -157,15 +158,16 @@ void FlowCtrlItem::Clear() { this->zero_cnt_ = config::kInvalidValue; } -bool FlowCtrlItem::GetDataLimit(long datadlt_m, int curr_time, FlowCtrlResult& flowctrl_result) { - if (this->type_ != 0 || datadlt_m <= this->datadlt_m_ ) { +bool FlowCtrlItem::GetDataLimit(int64_t datadlt_m, + int32_t curr_time, FlowCtrlResult& flowctrl_result) { + if (this->type_ != 0 || datadlt_m <= this->datadlt_m_) { return false; } - if (curr_time < this->start_time_ + if (curr_time < this->start_time_ || curr_time > this->end_time_) { return false; } - flowctrl_result.SetDataDltAndFreqLimit(this->datasize_limit_,this->freqms_limit_); + flowctrl_result.SetDataDltAndFreqLimit(this->datasize_limit_, this->freqms_limit_); return true; } @@ -179,21 +181,21 @@ FlowCtrlRuleHandler::FlowCtrlRuleHandler() { this->datalimit_start_time_.Set(2500); this->datalimit_end_time_.Set(config::kInvalidValue); this->last_update_time_ = Utils::GetCurrentTimeMillis(); - pthread_rwlock_init(&configrw_lock_, NULL); + pthread_rwlock_init(&configrw_lock_, NULL); } FlowCtrlRuleHandler::~FlowCtrlRuleHandler() { pthread_rwlock_destroy(&configrw_lock_); } -void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, - int qrypriority_id, long flowctrl_id, const string& flowctrl_info) { +void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, + int32_t qrypriority_id, int64_t flowctrl_id, const string& flowctrl_info) { bool result; - map > tmp_flowctrl_map; + map > tmp_flowctrl_map; if (flowctrl_id == this->flowctrl_id_.Get()) { return; } - long curr_flowctrl_id = this->flowctrl_id_.Get(); + int64_t curr_flowctrl_id = this->flowctrl_id_.Get(); if (flowctrl_info.length() > 0) { result = parseFlowCtrlInfo(flowctrl_info, tmp_flowctrl_map); } @@ -212,12 +214,13 @@ void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, this->last_update_time_ = Utils::GetCurrentTimeMillis(); pthread_rwlock_unlock(&this->configrw_lock_); if (is_default) { - LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id, flowctrl_id); + LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", + curr_flowctrl_id, flowctrl_id); } else { - LOG_INFO("[Flow Ctrl] Group FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id, flowctrl_id); + LOG_INFO("[Flow Ctrl] Group FlowCtrl's flowctrl_id from %ld to %ld\n", + curr_flowctrl_id, flowctrl_id); } return; - } @@ -262,7 +265,7 @@ void FlowCtrlRuleHandler::initialStatisData() { } it_vec->GetDataSizeLimit(); this->filter_ctrl_item_.ResetFlowCtrlValue(3, - (int)(it_vec->GetDataSizeLimit()),it_vec->GetFreqMsLimit(),it_vec->GetZeroCnt()); + (int)(it_vec->GetDataSizeLimit()), it_vec->GetFreqMsLimit(), it_vec->GetZeroCnt()); } } } @@ -276,15 +279,15 @@ void FlowCtrlRuleHandler::clearStatisData() { this->filter_ctrl_item_.Clear(); } -bool FlowCtrlRuleHandler::GetCurDataLimit(long last_datadlt, FlowCtrlResult& flowctrl_result) { +bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result) { struct tm utc_tm; vector::iterator it_vec; map >::iterator it_map; - time_t cur_time=time(NULL); + time_t cur_time = time(NULL); - gmtime_r(&cur_time,&utc_tm); + gmtime_r(&cur_time, &utc_tm); int curr_time = (utc_tm.tm_hour+8)%24 * 100 + utc_tm.tm_min; - if ((last_datadlt < this->min_datadlt_limt_.Get()) + if ((last_datadlt < this->min_datadlt_limt_.Get()) || (curr_time < this->datalimit_start_time_.Get()) || (curr_time > this->datalimit_end_time_.Get())) { return false; @@ -294,16 +297,15 @@ bool FlowCtrlRuleHandler::GetCurDataLimit(long last_datadlt, FlowCtrlResult& flo return false; } for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) { - if (it_vec->GetDataLimit(last_datadlt,curr_time,flowctrl_result)) { + if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) { return true; } } return false; } -int FlowCtrlRuleHandler::GetCurFreqLimitTime(int msg_zero_cnt, int received_limit) -{ - int rule_val = -2; +int FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit) { + int32_t rule_val = -2; vector::iterator it_vec; map >::iterator it_map; @@ -316,8 +318,7 @@ int FlowCtrlRuleHandler::GetCurFreqLimitTime(int msg_zero_cnt, int received_limi } for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) { rule_val = it_vec->GetFreLimit(msg_zero_cnt); - if (rule_val >= 0) - { + if (rule_val >= 0) { return rule_val; } } @@ -336,26 +337,26 @@ bool FlowCtrlRuleHandler::compareFeqQueue(const FlowCtrlItem& queue1, const Flow return (queue1.GetZeroCnt() < queue2.GetZeroCnt()); } -bool FlowCtrlRuleHandler::parseFlowCtrlInfo(const string& flowctrl_info, - map >& flowctrl_info_map) { - int type; +bool FlowCtrlRuleHandler::parseFlowCtrlInfo(const string& flowctrl_info, + map >& flowctrl_info_map) { + int32_t type; string err_info; stringstream ss; rapidjson::Document doc; // check flowctrl info length - if (flowctrl_info.length() == 0){ + if (flowctrl_info.length() == 0) { return false; } // parse flowctrl info if (doc.Parse(flowctrl_info.c_str()).HasParseError()) { - LOG_ERROR("Parsing flowCtrlInfo failure! flowctrl_info=%s\n",flowctrl_info.c_str()); + LOG_ERROR("Parsing flowCtrlInfo failure! flowctrl_info=%s\n", flowctrl_info.c_str()); return false; } if (!doc.IsArray()) { - LOG_ERROR("flowCtrlInfo's value must be array! flowctrl_info=%s\n",flowctrl_info.c_str()); + LOG_ERROR("flowCtrlInfo's value must be array! flowctrl_info=%s\n", flowctrl_info.c_str()); return false; } - for (unsigned int i = 0; i < doc.Size(); i++) { + for (uint32_t i = 0; i < doc.Size(); i++) { vector flowctrl_item_vec; const rapidjson::Value& node_item = doc[i]; if (!node_item.IsObject()) { @@ -366,42 +367,42 @@ bool FlowCtrlRuleHandler::parseFlowCtrlInfo(const string& flowctrl_info, ss << err_info; ss << " of type field in parse flowctrl_info!"; err_info = ss.str(); - LOG_ERROR("parse flowCtrlInfo failure %s", err_info.c_str()); + LOG_ERROR("parse flowCtrlInfo failure %s", err_info.c_str()); return false; - } + } if (type < 0 || type > 3) { ss << "type value must in [0,1,2,3] in index("; ss << i; ss << ") of flowctrl_info value!"; err_info = ss.str(); - LOG_ERROR("parse flowCtrlInfo failure %s",err_info.c_str()); + LOG_ERROR("parse flowCtrlInfo failure %s", err_info.c_str()); return false; } - + switch (type) { case 1: { if (FlowCtrlRuleHandler::parseFreqLimit(err_info, node_item, flowctrl_item_vec)) { - flowctrl_info_map[1]=flowctrl_item_vec; + flowctrl_info_map[1] = flowctrl_item_vec; } else { - LOG_ERROR("parse flowCtrlInfo's freqLimit failure: %s",err_info.c_str()); + LOG_ERROR("parse flowCtrlInfo's freqLimit failure: %s", err_info.c_str()); } } break; - + case 3: { if (FlowCtrlRuleHandler::parseLowFetchLimit(err_info, node_item, flowctrl_item_vec)) { - flowctrl_info_map[3]=flowctrl_item_vec; + flowctrl_info_map[3] = flowctrl_item_vec; } else { - LOG_ERROR("parse flowCtrlInfo's lowFetchLimit failure: %s",err_info.c_str()); + LOG_ERROR("parse flowCtrlInfo's lowFetchLimit failure: %s", err_info.c_str()); } } break; case 0: { if (FlowCtrlRuleHandler::parseDataLimit(err_info, node_item, flowctrl_item_vec)) { - flowctrl_info_map[0]=flowctrl_item_vec; + flowctrl_info_map[0] = flowctrl_item_vec; } else { - LOG_ERROR("parse flowCtrlInfo's dataLimit failure: %s",err_info.c_str()); + LOG_ERROR("parse flowCtrlInfo's dataLimit failure: %s", err_info.c_str()); } } break; @@ -409,13 +410,13 @@ bool FlowCtrlRuleHandler::parseFlowCtrlInfo(const string& flowctrl_info, default: break; } - } + } return true; } -bool FlowCtrlRuleHandler::parseDataLimit(string& err_info, +bool FlowCtrlRuleHandler::parseDataLimit(string& err_info, const rapidjson::Value& root, vector& flowctrl_items) { - int type_val; + int32_t type_val; stringstream ss; string attr_sep = delimiter::kDelimiterColon; string::size_type pos1; @@ -437,12 +438,12 @@ bool FlowCtrlRuleHandler::parseDataLimit(string& err_info, } // parse rule info const rapidjson::Value& obj_set = root["rule"]; - for (unsigned int index = 0 ; index < obj_set.Size() ; index++) { - int start_time = 0; - int end_time = 0; - long datadlt_m = 0; - long datasize_limit = 0; - int freqms_limit = 0; + for (uint32_t index = 0 ; index < obj_set.Size() ; index++) { + int32_t start_time = 0; + int32_t end_time = 0; + int64_t datadlt_m = 0; + int64_t datasize_limit = 0; + int32_t freqms_limit = 0; const rapidjson::Value& node_item = obj_set[index]; if (!node_item.IsObject()) { err_info = "Illegal rule'value item, must be dict type"; @@ -489,7 +490,7 @@ bool FlowCtrlRuleHandler::parseDataLimit(string& err_info, ss << ") of data limit rule!"; err_info = ss.str(); return false; - } + } datasize_limit = datasize_limit * 1024 * 1024; if (!parseIntMember(err_info, node_item, "freqInMs", freqms_limit, false, -1)) { ss << "freqInMs key is required in index("; @@ -504,20 +505,21 @@ bool FlowCtrlRuleHandler::parseDataLimit(string& err_info, ss << ") of data limit rule!"; err_info = ss.str(); return false; - } - FlowCtrlItem flowctrl_item(0, start_time, end_time, datadlt_m, datasize_limit, freqms_limit); + } + FlowCtrlItem flowctrl_item(0, start_time, + end_time, datadlt_m, datasize_limit, freqms_limit); flowctrl_items.push_back(flowctrl_item); } if (!flowctrl_items.empty()) { std::sort(flowctrl_items.begin(), flowctrl_items.end(), compareDataLimitQueue); } err_info = "Ok"; - return true; + return true; } -bool FlowCtrlRuleHandler::parseFreqLimit(string& err_info, +bool FlowCtrlRuleHandler::parseFreqLimit(string& err_info, const rapidjson::Value& root, vector& flowctrl_items) { - int type_val; + int32_t type_val; stringstream ss; if (!parseIntMember(err_info, root, "type", type_val, true, 1)) { @@ -537,9 +539,9 @@ bool FlowCtrlRuleHandler::parseFreqLimit(string& err_info, } // parse rule info const rapidjson::Value& obj_set = root["rule"]; - for (unsigned int i = 0 ; i < obj_set.Size() ; i++) { - int zeroCnt = -2; - int freqms_limit = -2; + for (uint32_t i = 0 ; i < obj_set.Size() ; i++) { + int32_t zeroCnt = -2; + int32_t freqms_limit = -2; const rapidjson::Value& node_item = obj_set[i]; if (!node_item.IsObject()) { err_info = "Illegal rule'value item, must be dict type"; @@ -566,12 +568,12 @@ bool FlowCtrlRuleHandler::parseFreqLimit(string& err_info, std::sort(flowctrl_items.begin(), flowctrl_items.end(), compareFeqQueue); } err_info = "Ok"; - return true; + return true; } -bool FlowCtrlRuleHandler::parseLowFetchLimit(string& err_info, +bool FlowCtrlRuleHandler::parseLowFetchLimit(string& err_info, const rapidjson::Value& root, vector& flowctrl_items) { - int type_val; + int32_t type_val; stringstream ss; if (!parseIntMember(err_info, root, "type", type_val, true, 3)) { ss << "Decode Failure: "; @@ -590,10 +592,10 @@ bool FlowCtrlRuleHandler::parseLowFetchLimit(string& err_info, } // parse rule info const rapidjson::Value& node_item = root["rule"]; - for (unsigned int i = 0 ; i < node_item.Size() ; i++) { - int norm_freq_ms = 0; - int filter_freq_ms = 0; - int min_filter_freq_ms = 0; + for (uint32_t i = 0 ; i < node_item.Size() ; i++) { + int32_t norm_freq_ms = 0; + int32_t filter_freq_ms = 0; + int32_t min_filter_freq_ms = 0; FlowCtrlItem flowctrl_item; const rapidjson::Value& node_item = node_item[i]; if (!node_item.IsObject()) { @@ -602,16 +604,16 @@ bool FlowCtrlRuleHandler::parseLowFetchLimit(string& err_info, } if (node_item.HasMember("filterFreqInMs") || node_item.HasMember("minDataFilterFreqInMs")) { - if (!parseIntMember(err_info, node_item, - "filterFreqInMs", filter_freq_ms, false, -1)) { + if (!parseIntMember(err_info, node_item, + "filterFreqInMs", filter_freq_ms, false, -1)) { ss << "Decode Failure: "; ss << err_info; ss << " of filterFreqInMs field in parse low fetch limit!"; err_info = ss.str(); return false; } - if (!parseIntMember(err_info, node_item, - "minDataFilterFreqInMs", min_filter_freq_ms, false, -1)) { + if (!parseIntMember(err_info, node_item, + "minDataFilterFreqInMs", min_filter_freq_ms, false, -1)) { ss << "Decode Failure: "; ss << err_info; ss << " of minDataFilterFreqInMs field in parse low fetch limit!"; @@ -644,8 +646,8 @@ bool FlowCtrlRuleHandler::parseLowFetchLimit(string& err_info, } } if (node_item.HasMember("normFreqInMs")) { - if (!parseIntMember(err_info, node_item, - "normFreqInMs", norm_freq_ms, false, -1)) { + if (!parseIntMember(err_info, node_item, + "normFreqInMs", norm_freq_ms, false, -1)) { ss << "Decode Failure: "; ss << err_info; ss << " of normFreqInMs field in parse low fetch limit!"; @@ -659,17 +661,19 @@ bool FlowCtrlRuleHandler::parseLowFetchLimit(string& err_info, ss << ") of low fetch limit rule!"; err_info = ss.str(); return false; - } + } } - flowctrl_item.ResetFlowCtrlValue(3,norm_freq_ms,filter_freq_ms,min_filter_freq_ms); + flowctrl_item.ResetFlowCtrlValue(3, + norm_freq_ms, filter_freq_ms, min_filter_freq_ms); flowctrl_items.push_back(flowctrl_item); } err_info = "Ok"; return true; } -bool FlowCtrlRuleHandler::parseStringMember(string& err_info, const rapidjson::Value& root, - const char* key, string& value, bool compare_value, string required_val) { +bool FlowCtrlRuleHandler::parseStringMember(string& err_info, + const rapidjson::Value& root, const char* key, string& value, + bool compare_value, string required_val) { // check key if exist if (!root.HasMember(key)) { err_info = "Field not existed"; @@ -679,7 +683,7 @@ bool FlowCtrlRuleHandler::parseStringMember(string& err_info, const rapidjson::V err_info = "Illegal value, must be string type"; return false; } - + if (compare_value) { if (root[key].GetString() != required_val) { err_info = "Illegal value, not required value content"; @@ -690,8 +694,9 @@ bool FlowCtrlRuleHandler::parseStringMember(string& err_info, const rapidjson::V return true; } -bool FlowCtrlRuleHandler::parseLongMember(string& err_info, const rapidjson::Value& root, - const char* key, long& value, bool compare_value, long required_val) { +bool FlowCtrlRuleHandler::parseLongMember(string& err_info, + const rapidjson::Value& root, const char* key, int64_t& value, + bool compare_value, int64_t required_val) { if (!root.HasMember(key)) { err_info = "Field not existed"; return false; @@ -701,17 +706,18 @@ bool FlowCtrlRuleHandler::parseLongMember(string& err_info, const rapidjson::Val return false; } if (compare_value) { - if ((long)root[key].GetInt64() != required_val) { + if ((int64_t)root[key].GetInt64() != required_val) { err_info = "Illegal value, not required value content"; return false; } } - value = (long)root[key].GetInt64(); + value = (int64_t)root[key].GetInt64(); return true; } -bool FlowCtrlRuleHandler::parseIntMember(string& err_info, const rapidjson::Value& root, - const char* key, int& value, bool compare_value, int required_val) { +bool FlowCtrlRuleHandler::parseIntMember(string& err_info, + const rapidjson::Value& root, const char* key, int32_t& value, + bool compare_value, int32_t required_val) { if (!root.HasMember(key)) { err_info = "Field not existed"; return false; @@ -730,8 +736,8 @@ bool FlowCtrlRuleHandler::parseIntMember(string& err_info, const rapidjson::Valu return true; } -bool FlowCtrlRuleHandler::parseTimeMember(string& err_info, - const rapidjson::Value& root, const char* key, int& value) { +bool FlowCtrlRuleHandler::parseTimeMember(string& err_info, + const rapidjson::Value& root, const char* key, int32_t& value) { // check key if exist stringstream ss; if (!root.HasMember(key)) { @@ -759,29 +765,28 @@ bool FlowCtrlRuleHandler::parseTimeMember(string& err_info, err_info = ss.str(); return false; } - string sub_str_1 = str_value.substr(0,pos1); - string sub_str_2 = - str_value.substr(pos1+attr_sep.size(),str_value.size()); - int in_hour = atoi(sub_str_1.c_str()); - int in_minute = atoi(sub_str_2.c_str()); + string sub_str_1 = str_value.substr(0, pos1); + string sub_str_2 = str_value.substr(pos1 + attr_sep.size(), str_value.size()); + int32_t in_hour = atoi(sub_str_1.c_str()); + int32_t in_minute = atoi(sub_str_2.c_str()); if (in_hour < 0 || in_hour > 24) { ss << "field "; ss << key; ss << " -hour value must in [0,23]!"; err_info = ss.str(); - return false; + return false; } if (in_minute < 0 || in_minute > 59) { ss << "field "; ss << key; ss << " -minute value must in [0,59]!"; err_info = ss.str(); - return false; + return false; } value = in_hour * 100 + in_minute; return true; } -} +} // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/src/logger.cc b/tubemq-client-twins/tubemq-client-cpp/src/logger.cc index c1860df..a9d5df3 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/logger.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/logger.cc @@ -18,13 +18,11 @@ */ #include "logger.h" - #include #include #include #include #include - #include namespace tubemq { diff --git a/tubemq-client-twins/tubemq-client-cpp/src/message.cc b/tubemq-client-twins/tubemq-client-cpp/src/message.cc index e737a41..d57fb5a 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/message.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/message.cc @@ -17,19 +17,19 @@ * under the License. */ -#include -#include - #include "message.h" -#include "utils.h" +#include +#include #include "const_config.h" +#include "utils.h" + namespace tubemq { // message flag's properties settings -static const int kMsgFlagIncProperties = 0x01; +static const int32_t kMsgFlagIncProperties = 0x01; // reserved property key Filter Item static const string kRsvPropKeyFilterItem = "$msgType$"; // reserved property key message send time @@ -53,7 +53,7 @@ Message::Message(const Message& target) { this->flag_ = target.flag_; } -Message::Message(const string& topic, const char* data, int datalen) { +Message::Message(const string& topic, const char* data, uint32_t datalen) { this->topic_ = topic; this->flag_ = 0; this->message_id_ = config::kInvalidValue; @@ -66,7 +66,7 @@ Message::~Message() { } Message& Message::operator=(const Message& target) { - if (this == &target) + if (this == &target) return *this; this->topic_ = target.topic_; this->message_id_ = target.message_id_; @@ -77,11 +77,11 @@ Message& Message::operator=(const Message& target) { return *this; } -const long Message::GetMessageId() const { +const uint64_t Message::GetMessageId() const { return this->message_id_; } -void Message::SetMessageId(long message_id) { +void Message::SetMessageId(int64_t message_id) { this->message_id_ = message_id; } @@ -97,20 +97,20 @@ const char* Message::GetData() const { return this->data_; } -int Message::GetDataLength() const { +uint32_t Message::GetDataLength() const { return this->datalen_; } -void Message::setData(const char* data, int datalen) { +void Message::setData(const char* data, uint32_t datalen) { clearData(); copyData(data, datalen); } -const int Message::GetFlag() const { +const int32_t Message::GetFlag() const { return this->flag_; } -void Message::SetFlag(int flag) { +void Message::SetFlag(int32_t flag) { this->flag_ = flag; } @@ -118,7 +118,7 @@ const map& Message::GetProperties() const { return this->properties_; } -int Message::GetProperties(string& attribute) { +int32_t Message::GetProperties(string& attribute) { attribute.clear(); map::iterator it_map; for (it_map = this->properties_.begin(); it_map != this->properties_.end(); ++it_map) { @@ -141,7 +141,7 @@ bool Message::HasProperty(const string& key) { return true; } } - return false; + return false; } bool Message::GetProperty(const string& key, string& value) { @@ -154,7 +154,7 @@ bool Message::GetProperty(const string& key, string& value) { return true; } } - return false; + return false; } bool Message::GetFilterItem(string& value) { @@ -168,7 +168,7 @@ bool Message::AddProperty(string& err_info, const string& key, const string& val err_info = "Not allowed null value of parmeter key or value"; return false; } - if ((string::npos != trimed_key.find(delimiter::kDelimiterComma)) + if ((string::npos != trimed_key.find(delimiter::kDelimiterComma)) ||(string::npos != trimed_key.find(delimiter::kDelimiterEqual))) { stringstream ss; ss << "Reserved token '"; @@ -179,7 +179,7 @@ bool Message::AddProperty(string& err_info, const string& key, const string& val err_info = ss.str(); return false; } - if ((string::npos != trimed_value.find(delimiter::kDelimiterComma)) + if ((string::npos != trimed_value.find(delimiter::kDelimiterComma)) ||(string::npos != trimed_value.find(delimiter::kDelimiterEqual))) { stringstream ss; ss << "Reserved token '"; @@ -190,8 +190,8 @@ bool Message::AddProperty(string& err_info, const string& key, const string& val err_info = ss.str(); return false; } - if (trimed_key == kRsvPropKeyFilterItem - || trimed_key == kRsvPropKeyMsgTime) { + if (trimed_key == kRsvPropKeyFilterItem + || trimed_key == kRsvPropKeyMsgTime) { stringstream ss; ss << "Reserved token '"; ss << kRsvPropKeyFilterItem; @@ -218,7 +218,7 @@ void Message::clearData() { } } -void Message::copyData(const char* data, int datalen) { +void Message::copyData(const char* data, uint32_t datalen) { if (data == NULL) { this->data_ = NULL; this->datalen_ = 0; @@ -242,7 +242,7 @@ void Message::copyProperties(const map& properties) { } -} +} // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc index 8024a6a..ebd75d1 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc @@ -17,21 +17,25 @@ * under the License. */ +#include "meta_info.h" +#include #include #include -#include -#include "utils.h" -#include "meta_info.h" #include "const_config.h" +#include "utils.h" namespace tubemq { +using std::vector; +using std::sstream; + + NodeInfo::NodeInfo() { - this->node_id_ = config::kInvalidValue; + this->node_id_ = 0; this->node_host_ = " "; - this->node_port_ = config::kInvalidValue; + this->node_port_ = config::kBrokerPortDef; buildStrInfo(); } @@ -43,11 +47,11 @@ NodeInfo::NodeInfo(bool is_broker, const string& node_info) { this->node_id_ = atoi(result[0].c_str()); this->node_host_ = result[1]; this->node_port_ = config::kBrokerPortDef; - if (result.size() >= 3){ + if (result.size() >= 3) { this->node_port_ = atoi(result[2].c_str()); } } else { - this->node_id_ = config::kInvalidValue; + this->node_id_ = 0; this->node_host_ = result[0]; this->node_port_ = config::kBrokerPortDef; if (result.size() >= 2) { @@ -57,15 +61,14 @@ NodeInfo::NodeInfo(bool is_broker, const string& node_info) { buildStrInfo(); } -NodeInfo::NodeInfo(const string& node_host, int node_port) { +NodeInfo::NodeInfo(const string& node_host, uint32_t node_port) { this->node_id_ = config::kInvalidValue; this->node_host_ = node_host; this->node_port_ = node_port; buildStrInfo(); - } -NodeInfo::NodeInfo(int node_id, const string& node_host, int node_port) { +NodeInfo::NodeInfo(int node_id, const string& node_host, uint32_t node_port) { this->node_id_ = node_id; this->node_host_ = node_host; this->node_port_ = node_port; @@ -73,11 +76,11 @@ NodeInfo::NodeInfo(int node_id, const string& node_host, int node_port) { } NodeInfo::~NodeInfo() { - + // } NodeInfo& NodeInfo::operator=(const NodeInfo& target) { - if (this != &target){ + if (this != &target) { this->node_id_ = target.node_id_; this->node_host_ = target.node_host_; this->node_port_ = target.node_port_; @@ -95,14 +98,12 @@ bool NodeInfo::operator== (const NodeInfo& target) { return true; } return false; - } bool NodeInfo::operator< (const NodeInfo& target) const { return this->node_info_ < target.node_info_; } - -const int NodeInfo::GetNodeId() const { +const uint32_t NodeInfo::GetNodeId() const { return this->node_id_; } @@ -110,10 +111,10 @@ const string& NodeInfo::GetHost() const { return this->node_host_; } -const int NodeInfo::GetPort() const { +const uint32_t NodeInfo::GetPort() const { return this->node_port_; } - + const string& NodeInfo::GetAddrInfo() const { return this->addr_info_; } @@ -139,7 +140,7 @@ void NodeInfo::buildStrInfo() { Partition::Partition() { this->topic_ = " "; - this->partition_id_ = config::kInvalidValue; + this->partition_id_ = 0; buildPartitionKey(); } @@ -147,14 +148,14 @@ Partition::Partition() { Partition::Partition(const string& partition_info) { // initial process this->topic_ = " "; - this->partition_id_ = config::kInvalidValue; + this->partition_id_ = 0; // parse partition_info string - string::size_type pos=0; + string::size_type pos = 0; string seg_key = delimiter::kDelimiterPound; string token_key = delimiter::kDelimiterColon; // parse broker_info pos = partition_info.find(seg_key); - if (pos != string::npos){ + if (pos != string::npos) { string broker_info = partition_info.substr(0, pos); broker_info = Utils::Trim(broker_info); this->broker_info_ = NodeInfo(true, broker_info); @@ -172,12 +173,12 @@ Partition::Partition(const string& partition_info) { } buildPartitionKey(); } - + // part_str = topic:partition_id Partition::Partition(const NodeInfo& broker_info, const string& part_str) { vector result; this->topic_ = " "; - this->partition_id_ = config::kInvalidValue; + this->partition_id_ = 0; this->broker_info_ = broker_info; Utils::Split(part_str, result, delimiter::kDelimiterColon); if (result.size() >= 2) { @@ -187,7 +188,7 @@ Partition::Partition(const NodeInfo& broker_info, const string& part_str) { buildPartitionKey(); } -Partition::Partition(const NodeInfo& broker_info, const string& topic, int partition_id) { +Partition::Partition(const NodeInfo& broker_info, const string& topic, uint32_t partition_id) { this->topic_ = topic; this->partition_id_ = partition_id; this->broker_info_ = broker_info; @@ -217,10 +218,9 @@ bool Partition::operator== (const Partition& target) { return true; } return false; - } -const int Partition::GetBrokerId() const { +const uint32_t Partition::GetBrokerId() const { return this->broker_info_.GetNodeId(); } @@ -228,7 +228,7 @@ const string& Partition::GetBrokerHost() const { return this->broker_info_.GetHost(); } -const int Partition::GetBrokerPort() const { +const uint32_t Partition::GetBrokerPort() const { return this->broker_info_.GetPort(); } @@ -244,7 +244,7 @@ const NodeInfo& Partition::GetBrokerInfo() const { return this->broker_info_; } -const int Partition::GetPartitionId() const { +const uint32_t Partition::GetPartitionId() const { return this->partition_id_; } @@ -273,13 +273,13 @@ void Partition::buildPartitionKey() { // sub_info = consumerId@group#broker_info#topic:partitionId SubscribeInfo::SubscribeInfo(const string& sub_info) { - string::size_type pos=0; + string::size_type pos = 0; string seg_key = delimiter::kDelimiterPound; string at_key = delimiter::kDelimiterAt; this->consumer_id_ = " "; this->group_ = " "; // parse sub_info - pos=sub_info.find(seg_key); + pos = sub_info.find(seg_key); if (pos != string::npos) { string consumer_info = sub_info.substr(0, pos); consumer_info = Utils::Trim(consumer_info); @@ -295,7 +295,7 @@ SubscribeInfo::SubscribeInfo(const string& sub_info) { buildSubInfo(); } -SubscribeInfo::SubscribeInfo(const string& consumer_id, +SubscribeInfo::SubscribeInfo(const string& consumer_id, const string& group, const Partition& partition) { this->consumer_id_ = consumer_id; this->group_ = group; @@ -325,7 +325,7 @@ const Partition& SubscribeInfo::GetPartition() const { return this->partition_; } -const int SubscribeInfo::GgetBrokerId() const { +const uint32_t SubscribeInfo::GgetBrokerId() const { return this->partition_.GetBrokerId(); } @@ -333,7 +333,7 @@ const string& SubscribeInfo::GetBrokerHost() const { return this->partition_.GetBrokerHost(); } -const int SubscribeInfo::GetBrokerPort() const { +const uint32_t SubscribeInfo::GetBrokerPort() const { return this->partition_.GetBrokerPort(); } @@ -341,7 +341,7 @@ const string& SubscribeInfo::GetTopic() const { return this->partition_.GetTopic(); } -const int SubscribeInfo::GetPartitionId() const { +const uint32_t SubscribeInfo::GetPartitionId() const { return this->partition_.GetPartitionId(); } @@ -373,8 +373,8 @@ ConsumerEvent::ConsumerEvent(const ConsumerEvent& target) { this->subscribe_list_ = target.subscribe_list_; } -ConsumerEvent::ConsumerEvent(long rebalance_id,int event_type, - const list& subscribeInfo_lst, int event_status) { +ConsumerEvent::ConsumerEvent(int64_t rebalance_id, int32_t event_type, + const list& subscribeInfo_lst, int32_t event_status) { list::const_iterator it; this->rebalance_id_ = rebalance_id; this->event_type_ = event_type; @@ -385,7 +385,7 @@ ConsumerEvent::ConsumerEvent(long rebalance_id,int event_type, } ConsumerEvent& ConsumerEvent::operator=(const ConsumerEvent& target) { - if(this != &target){ + if (this != &target) { this->rebalance_id_ = target.rebalance_id_; this->event_type_ = target.event_type_; this->event_status_ = target.event_status_; @@ -394,23 +394,23 @@ ConsumerEvent& ConsumerEvent::operator=(const ConsumerEvent& target) { return *this; } -const long ConsumerEvent::GetRebalanceId() const { +const int64_t ConsumerEvent::GetRebalanceId() const { return this->rebalance_id_; } -const int ConsumerEvent::GetEventType() const { +const int32_t ConsumerEvent::GetEventType() const { return this->event_type_; } -const int ConsumerEvent::GetEventStatus() const { +const int32_t ConsumerEvent::GetEventStatus() const { return this->event_status_; } -void ConsumerEvent::SetEventType(int event_type) { +void ConsumerEvent::SetEventType(int32_t event_type) { this->event_type_ = event_type; } -void ConsumerEvent::SetEventStatus(int event_status) { +void ConsumerEvent::SetEventStatus(int32_t event_status) { this->event_status_ = event_status; } @@ -419,7 +419,7 @@ const list& ConsumerEvent::GetSubscribeInfoList() const { } string ConsumerEvent::ToString() { - int count = 0; + uint32_t count = 0; stringstream ss; list::const_iterator it; ss << "ConsumerEvent [rebalanceId="; @@ -429,16 +429,16 @@ string ConsumerEvent::ToString() { ss << ", status="; ss << this->event_status_; ss << ", subscribeInfoList=["; - for (it = this->subscribe_list_.begin(); + for (it = this->subscribe_list_.begin(); it != this->subscribe_list_.end(); ++it) { - if(count++ > 0) { + if (count++ > 0) { ss << ","; } ss << it->ToString(); } ss << "]]"; - return ss.str(); + return ss.str(); } -}; +}; // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc index d1fac8f..c699266 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc @@ -17,14 +17,14 @@ * under the License. */ +#include "utils.h" +#include #include -#include +#include "const_config.h" #include #include -#include #include -#include "utils.h" -#include "const_config.h" +#include namespace tubemq { @@ -48,21 +48,21 @@ string Utils::Trim(const string& source) { void Utils::Split(const string& source, vector& result, const string& delimiter) { string item_str; - string::size_type pos1,pos2; + string::size_type pos1 = 0; + string::size_type pos2 = 0; result.clear(); if (!source.empty()) { pos1 = 0; pos2 = source.find(delimiter); while (string::npos != pos2) { - item_str = Utils::Trim(source.substr(pos1, pos2-pos1)); + item_str = Utils::Trim(source.substr(pos1, pos2 - pos1)); pos1 = pos2 + delimiter.size(); pos2 = source.find(delimiter, pos1); if (!item_str.empty()) { result.push_back(item_str); } } - if (pos1 != source.length()) - { + if (pos1 != source.length()) { item_str = Utils::Trim(source.substr(pos1)); if (!item_str.empty()) { result.push_back(item_str); @@ -72,12 +72,14 @@ void Utils::Split(const string& source, vector& result, const string& de } -void Utils::Split(const string& source, map& result, +void Utils::Split(const string& source, map& result, const string& delimiter_step1, const string& delimiter_step2) { string item_str; string key_str; string val_str; - string::size_type pos1,pos2,pos3; + string::size_type pos1 = 0; + string::size_type pos2 = 0; + string::size_type pos3 = 0; if (!source.empty()) { pos1 = 0; pos2 = source.find(delimiter_step1); @@ -111,7 +113,7 @@ void Utils::Split(const string& source, map& result, val_str = item_str.substr(pos3+delimiter_step2.length()); key_str = Utils::Trim(key_str); val_str = Utils::Trim(val_str); - if (!key_str.empty()){ + if (!key_str.empty()) { result[key_str] = atoi(val_str.c_str()); } } @@ -130,8 +132,8 @@ void Utils::Join(const vector& vec, const string& delimiter, string& tar } } -bool Utils::ValidString(string& err_info, const string& source, - bool allow_empty, bool pat_match, bool check_max_length, +bool Utils::ValidString(string& err_info, const string& source, + bool allow_empty, bool pat_match, bool check_max_length, unsigned int maxlen) { if (source.empty()) { if (allow_empty) { @@ -153,11 +155,11 @@ bool Utils::ValidString(string& err_info, const string& source, } if (pat_match) { - int cflags =REG_EXTENDED; - regex_t reg; + int cflags = REG_EXTENDED; + regex_t reg; regmatch_t pmatch[1]; - const char* patRule = "^[a-zA-Z]\\w+$"; - regcomp(®, patRule,cflags); + const char* patRule = "^[a-zA-Z]\\w+$"; + regcomp(®, patRule, cflags); int status = regexec(®, source.c_str(), 1, pmatch, 0); regfree(®); if (status == REG_NOMATCH) { @@ -169,10 +171,10 @@ bool Utils::ValidString(string& err_info, const string& source, } } err_info = "Ok"; - return true; + return true; } -bool Utils::ValidGroupName(string& err_info, +bool Utils::ValidGroupName(string& err_info, const string& group_name, string& tgt_group_name) { tgt_group_name = Utils::Trim(group_name); if (tgt_group_name.empty()) { @@ -188,11 +190,11 @@ bool Utils::ValidGroupName(string& err_info, err_info = ss.str(); return false; } - int cflags =REG_EXTENDED; - regex_t reg; + int cflags = REG_EXTENDED; + regex_t reg; regmatch_t pmatch[1]; - const char* patRule = "^[a-zA-Z][\\w-]+$"; - regcomp(®, patRule,cflags); + const char* patRule = "^[a-zA-Z][\\w-]+$"; + regcomp(®, patRule, cflags); int status = regexec(®, tgt_group_name.c_str(), 1, pmatch, 0); regfree(®); if (status == REG_NOMATCH) { @@ -205,10 +207,10 @@ bool Utils::ValidGroupName(string& err_info, return false; } err_info = "Ok"; - return true; + return true; } -bool Utils::ValidFilterItem(string& err_info, +bool Utils::ValidFilterItem(string& err_info, const string& src_filteritem, string& tgt_filteritem) { tgt_filteritem = Utils::Trim(src_filteritem); if (tgt_filteritem.empty()) { @@ -223,11 +225,11 @@ bool Utils::ValidFilterItem(string& err_info, err_info = ss.str(); return false; } - int cflags =REG_EXTENDED; - regex_t reg; + int cflags = REG_EXTENDED; + regex_t reg; regmatch_t pmatch[1]; - const char* patRule = "^[_A-Za-z0-9]+$"; - regcomp(®, patRule,cflags); + const char* patRule = "^[_A-Za-z0-9]+$"; + regcomp(®, patRule, cflags); int status = regexec(®, tgt_filteritem.c_str(), 1, pmatch, 0); regfree(®); if (status == REG_NOMATCH) { @@ -235,24 +237,23 @@ bool Utils::ValidFilterItem(string& err_info, return false; } err_info = "Ok"; - return true; + return true; } - -string Utils::Int2str(int data) { +string Utils::Int2str(int32_t data) { stringstream ss; - ss< result_vec; Utils::Split(ipv4_addr, result_vec, delimiter::kDelimiterDot); @@ -263,13 +264,13 @@ int Utils::IpToInt(const string& ipv4_addr) { return result; } -long Utils::GetCurrentTimeMillis() { +int64_t Utils::GetCurrentTimeMillis() { struct timeval tv; - gettimeofday(&tv,NULL); + gettimeofday(&tv, NULL); return tv.tv_sec * 1000 + tv.tv_usec /1000; } -} +} // namespace tubemq