Return-Path: X-Original-To: apmail-singa-commits-archive@minotaur.apache.org Delivered-To: apmail-singa-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B3D3179DE for ; Wed, 24 Jun 2015 13:36:01 +0000 (UTC) Received: (qmail 86132 invoked by uid 500); 24 Jun 2015 13:36:01 -0000 Delivered-To: apmail-singa-commits-archive@singa.apache.org Received: (qmail 86114 invoked by uid 500); 24 Jun 2015 13:36:01 -0000 Mailing-List: contact commits-help@singa.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@singa.incubator.apache.org Delivered-To: mailing list commits@singa.incubator.apache.org Received: (qmail 86105 invoked by uid 99); 24 Jun 2015 13:36:01 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jun 2015 13:36:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id EC0FA18195D for ; Wed, 24 Jun 2015 13:36:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.79 X-Spam-Level: * X-Spam-Status: No, score=1.79 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Xa6szUsUrTe8 for ; Wed, 24 Jun 2015 13:35:45 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 231984C0D7 for ; Wed, 24 Jun 2015 13:35:45 +0000 (UTC) Received: (qmail 85856 invoked by uid 99); 24 Jun 2015 13:35:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jun 2015 13:35:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 68218E364D; Wed, 24 Jun 2015 13:35:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangwei@apache.org To: commits@singa.incubator.apache.org Date: Wed, 24 Jun 2015 13:35:49 -0000 Message-Id: <73e698da8df64c678c834d4d7e5b1fab@git.apache.org> In-Reply-To: <364688bdfdbe4c319774772775996c16@git.apache.org> References: <364688bdfdbe4c319774772775996c16@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/18] incubator-singa git commit: SINGA-21 Code review SINGA-21 Code review review data_shard.h, data_shard.cc -- refine Next() functions for reading data shards -- reformat add unit test for data shard Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/28ac5098 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/28ac5098 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/28ac5098 Branch: refs/heads/master Commit: 28ac509830adda0b26d09526fa4a1e2c2e82a07a Parents: aefc2d4 Author: wang sheng Authored: Wed Jun 24 15:23:43 2015 +0800 Committer: wang wei Committed: Wed Jun 24 17:06:54 2015 +0800 ---------------------------------------------------------------------- include/utils/data_shard.h | 99 +++++++------ src/test/test_cluster.cc | 30 ++-- src/test/test_shard.cc | 64 +++++++++ src/utils/cluster_rt.cc | 2 +- src/utils/data_shard.cc | 308 +++++++++++++++++++--------------------- 5 files changed, 274 insertions(+), 229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/include/utils/data_shard.h ---------------------------------------------------------------------- diff --git a/include/utils/data_shard.h b/include/utils/data_shard.h index 2ebade9..4156ab6 100644 --- a/include/utils/data_shard.h +++ b/include/utils/data_shard.h @@ -1,14 +1,11 @@ -#ifndef INCLUDE_UTILS_SHARD_H_ -#define INCLUDE_UTILS_SHARD_H_ +#ifndef SINGA_UTILS_DATA_SHARD_H_ +#define SINGA_UTILS_DATA_SHARD_H_ #include #include #include #include - -using google::protobuf::Message; - namespace singa { /** @@ -22,8 +19,8 @@ namespace singa { * encoded as [key_len key record_len val] (key_len and record_len are of type * uint32, which indicate the bytes of key and record respectively. * - * When Shard obj is created, it will remove the last key if the record size and - * key size do not match because the last write of tuple crashed. + * When Shard obj is created, it will remove the last key if the record size + * and key size do not match because the last write of tuple crashed. * * TODO * 1. split one shard into multile shards. @@ -33,54 +30,58 @@ namespace singa { class DataShard { public: enum { - //!< read only mode used in training - kRead=0, - //!< write mode used in creating shard (will overwrite previous one) - kCreate=1, - //!< append mode, e.g. used when previous creating crashes - kAppend=2 + // read only mode used in training + kRead = 0, + // write mode used in creating shard (will overwrite previous one) + kCreate = 1, + // append mode, e.g. used when previous creating crashes + kAppend = 2 }; - public: /** * Init the shard obj. - * @folder shard folder (path excluding shard.dat) on worker node - * @mode shard open mode, Shard::kRead, Shard::kWrite or Shard::kAppend - * @bufsize batch bufsize bytes data for every disk op (read or write), + * + * @param folder Shard folder (path excluding shard.dat) on worker node + * @param mode Shard open mode, Shard::kRead, Shard::kWrite or Shard::kAppend + * @param bufsize Batch bufsize bytes data for every disk op (read or write), * default is 100MB */ - DataShard(std::string folder, char mode, int capacity=104857600); + DataShard(const std::string& folder, int mode); + DataShard(const std::string& folder, int mode, int capacity); ~DataShard(); /** * read next tuple from the shard. - * @key key - * @param val record of type Message - * @return true if read success otherwise false, e.g., the tuple was not - * inserted completely. + * + * @param key Tuple key + * @param val Record of type Message + * @return false if read unsuccess, e.g., the tuple was not inserted + * completely. */ - bool Next(std::string *key, Message* val); + bool Next(std::string* key, google::protobuf::Message* val); /** * read next tuple from the shard. - * @key key tuple key - * @param val record of type string - * @return true if read success otherwise false, e.g., the tuple was not - * inserted completely. + * + * @param key Tuple key + * @param val Record of type string + * @return false if read unsuccess, e.g., the tuple was not inserted + * completely. */ - bool Next(std::string *key, std::string* val); - + bool Next(std::string* key, std::string* val); /** * Append one tuple to the shard. + * * @param key e.g., image path * @param val - * @return reture if sucess, otherwise false, e.g., inserted before + * @return false if unsucess, e.g., inserted before */ - bool Insert(const std::string& key, const Message& tuple); + bool Insert(const std::string& key, const google::protobuf::Message& tuple); /** * Append one tuple to the shard. + * * @param key e.g., image path * @param val - * @return reture if sucess, otherwise false, e.g., inserted before + * @return false if unsucess, e.g., inserted before */ bool Insert(const std::string& key, const std::string& tuple); /** @@ -92,54 +93,58 @@ class DataShard { * Flush buffered data to disk. * Used only for kCreate or kAppend. */ - void Flush() ; + void Flush(); /** * Iterate through all tuples to get the num of all tuples. + * * @return num of tuples */ - const int Count(); + int Count(); /** * @return path to shard file */ - const std::string path(){ - return path_; - } + inline std::string path() { return path_; } protected: /** * Read the next key and prepare buffer for reading value. + * * @param key * @return length (i.e., bytes) of value field. */ - int Next(std::string *key); + int Next(std::string* key); /** * Setup the disk pointer to the right position for append in case that * the pervious write crashes. + * * @param path shard path. * @return offset (end pos) of the last success written record. */ - int PrepareForAppend(std::string path); + int PrepareForAppend(const std::string& path); /** * Read data from disk if the current data in the buffer is not a full field. + * * @param size size of the next field. */ bool PrepareNextField(int size); private: - char mode_; - std::string path_; + char mode_ = 0; + std::string path_ = ""; // either ifstream or ofstream std::fstream fdat_; // to avoid replicated record std::unordered_set keys_; // internal buffer - char* buf_; + char* buf_ = nullptr; // offset inside the buf_ - int offset_; + int offset_ = 0; // allocated bytes for the buf_ - int capacity_; + int capacity_ = 0; // bytes in buf_, used in reading - int bufsize_; + int bufsize_ = 0; }; -} /* singa */ -#endif // INCLUDE_UTILS_SHARD_H_ + +} // namespace singa + +#endif // SINGA_UTILS_DATA_SHARD_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/src/test/test_cluster.cc ---------------------------------------------------------------------- diff --git a/src/test/test_cluster.cc b/src/test/test_cluster.cc index b16d765..c34dd0f 100644 --- a/src/test/test_cluster.cc +++ b/src/test/test_cluster.cc @@ -1,46 +1,36 @@ -#include #include "gtest/gtest.h" #include "proto/cluster.pb.h" #include "utils/cluster.h" using namespace singa; -//string folder="src/test/data/"; +std::string host = "localhost:2181"; -string host="localhost:2181"; - -void zk_cb(void *contest){ - LOG(INFO) << "zk callback: " << (char *)contest; +void zk_cb(void *contest) { + LOG(INFO) << "zk callback: " << static_cast(contest); } -TEST(CluserRuntimeTest, GroupManagement){ +TEST(CluserRuntimeTest, GroupManagement) { ClusterRuntime* rt = new ZKClusterRT(host); ASSERT_EQ(rt->Init(), true); - - ASSERT_EQ(rt->sWatchSGroup(1, 1, zk_cb, "test call back"), true); - - ASSERT_EQ(rt->wJoinSGroup(1, 1, 1), true); - ASSERT_EQ(rt->wJoinSGroup(1, 2, 1), true); - - ASSERT_EQ(rt->wLeaveSGroup(1, 2, 1), true); - ASSERT_EQ(rt->wLeaveSGroup(1, 1, 1), true); - + ASSERT_EQ(rt->WatchSGroup(1, 1, zk_cb, "test call back"), true); + ASSERT_EQ(rt->JoinSGroup(1, 1, 1), true); + ASSERT_EQ(rt->JoinSGroup(1, 2, 1), true); + ASSERT_EQ(rt->LeaveSGroup(1, 2, 1), true); + ASSERT_EQ(rt->LeaveSGroup(1, 1, 1), true); sleep(3); delete rt; } -TEST(CluserRuntimeTest, ProcessManagement){ +TEST(CluserRuntimeTest, ProcessManagement) { ClusterRuntime* rt = new ZKClusterRT(host); ASSERT_EQ(rt->Init(), true); - ASSERT_EQ(rt->RegistProc("1.2.3.4:5"), 0); ASSERT_EQ(rt->RegistProc("1.2.3.4:6"), 1); ASSERT_EQ(rt->RegistProc("1.2.3.4:7"), 2); - ASSERT_NE(rt->GetProcHost(0), ""); ASSERT_NE(rt->GetProcHost(1), ""); ASSERT_NE(rt->GetProcHost(2), ""); - sleep(3); delete rt; } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/src/test/test_shard.cc ---------------------------------------------------------------------- diff --git a/src/test/test_shard.cc b/src/test/test_shard.cc new file mode 100644 index 0000000..6fe478e --- /dev/null +++ b/src/test/test_shard.cc @@ -0,0 +1,64 @@ +#include + +#include "gtest/gtest.h" +#include "utils/data_shard.h" + +std::string key[] = {"firstkey", + "secondkey", + "3key", + "key4", + "key5"}; +std::string tuple[] = {"firsttuple", + "2th-tuple", + "thridtuple", + "tuple4", + "tuple5"}; + +using namespace singa; + +TEST(DataShardTest, CreateDataShard) { + std::string path = "src/test/data/shard_test"; + mkdir(path.c_str(), 0755); + DataShard shard(path, DataShard::kCreate, 50); + shard.Insert(key[0], tuple[0]); + shard.Insert(key[1], tuple[1]); + shard.Insert(key[2], tuple[2]); + shard.Flush(); +} + +TEST(DataShardTest, AppendDataShard) { + std::string path = "src/test/data/shard_test"; + DataShard shard(path, DataShard::kAppend, 50); + shard.Insert(key[3], tuple[3]); + shard.Insert(key[4], tuple[4]); + shard.Flush(); +} + +TEST(DataShardTest, CountDataShard) { + std::string path = "src/test/data/shard_test"; + DataShard shard(path, DataShard::kRead, 50); + int count = shard.Count(); + ASSERT_EQ(5, count); +} + +TEST(DataShardTest, ReadDataShard) { + std::string path = "src/test/data/shard_test"; + DataShard shard(path, DataShard::kRead, 50); + std::string k, t; + ASSERT_TRUE(shard.Next(&k, &t)); + ASSERT_STREQ(key[0].c_str(), k.c_str()); + ASSERT_STREQ(tuple[0].c_str(), t.c_str()); + ASSERT_TRUE(shard.Next(&k, &t)); + ASSERT_STREQ(key[1].c_str(), k.c_str()); + ASSERT_STREQ(tuple[1].c_str(), t.c_str()); + ASSERT_TRUE(shard.Next(&k, &t)); + ASSERT_TRUE(shard.Next(&k, &t)); + ASSERT_TRUE(shard.Next(&k, &t)); + ASSERT_STREQ(key[4].c_str(), k.c_str()); + ASSERT_STREQ(tuple[4].c_str(), t.c_str()); + ASSERT_FALSE(shard.Next(&k, &t)); + shard.SeekToFirst(); + ASSERT_TRUE(shard.Next(&k, &t)); + ASSERT_STREQ(key[0].c_str(), k.c_str()); + ASSERT_STREQ(tuple[0].c_str(), t.c_str()); +} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/src/utils/cluster_rt.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc index 748a261..bedf714 100644 --- a/src/utils/cluster_rt.cc +++ b/src/utils/cluster_rt.cc @@ -183,7 +183,7 @@ bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag, } // copy the node name ot output if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) { - snprintf(output, strlen(buf), "%s", buf); + strcpy(output, buf); } if (ret == ZOK) { LOG(INFO) << "created zookeeper node " << buf http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/src/utils/data_shard.cc ---------------------------------------------------------------------- diff --git a/src/utils/data_shard.cc b/src/utils/data_shard.cc index df311e1..1dc61d2 100644 --- a/src/utils/data_shard.cc +++ b/src/utils/data_shard.cc @@ -1,207 +1,193 @@ -#include +#include "utils/data_shard.h" + #include +#include -#include "utils/data_shard.h" namespace singa { -DataShard::DataShard(std::string folder, char mode, int capacity){ - struct stat sb; - if(stat(folder.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)){ - LOG(INFO)<<"Open shard folder "<capacity_){ - fdat_.write(buf_, offset_); - offset_=0; - CHECK_LE(size, capacity_)<<"Tuple size is larger than capacity" - <<"Try a larger capacity size"; - } - *reinterpret_cast(buf_+offset_)=key.size(); - offset_+=sizeof(size_t); - memcpy(buf_+offset_, key.data(), key.size()); - offset_+=key.size(); - *reinterpret_cast(buf_+offset_)=val.size(); - offset_+=sizeof(size_t); - memcpy(buf_+offset_, val.data(), val.size()); - offset_+=val.size(); +bool DataShard::Next(std::string* key, google::protobuf::Message* val) { + int vallen = Next(key); + if (vallen == 0) return false; + val->ParseFromArray(buf_ + offset_, vallen); + offset_ += vallen; return true; } -void DataShard::Flush() { - fdat_.write(buf_, offset_); - fdat_.flush(); - offset_=0; -} - -int DataShard::Next(std::string *key){ - key->clear(); - int ssize=sizeof(size_t); - if(!PrepareNextField(ssize)) - return 0; - CHECK_LE(offset_+ssize, bufsize_); - int keylen=*reinterpret_cast(buf_+offset_); - offset_+=ssize; - - if(!PrepareNextField(keylen)) - return 0; - CHECK_LE(offset_+keylen, bufsize_); - for(int i=0;ipush_back(buf_[offset_+i]); - offset_+=keylen; - - if(!PrepareNextField(ssize)) - return 0; - CHECK_LE(offset_+ssize, bufsize_); - int vallen=*reinterpret_cast(buf_+offset_); - offset_+=ssize; - - if(!PrepareNextField(vallen)) - return 0; - CHECK_LE(offset_+vallen, bufsize_); - return vallen; +bool DataShard::Next(std::string *key, std::string* val) { + int vallen = Next(key); + if (vallen == 0) return false; + val->clear(); + for (int i = 0; i < vallen; ++i) + val->push_back(buf_[offset_ + i]); + offset_ += vallen; + return true; } -bool DataShard::Next(std::string *key, Message* val) { - int vallen=Next(key); - if(vallen==0) - return false; - val->ParseFromArray(buf_+offset_, vallen); - offset_+=vallen; - return true; +bool DataShard::Insert(const std::string& key, + const google::protobuf::Message& val) { + std::string str; + val.SerializeToString(&str); + return Insert(key, str); } -bool DataShard::Next(std::string *key, std::string* val) { - int vallen=Next(key); - if(vallen==0) +// insert one complete tuple +bool DataShard::Insert(const std::string& key, const std::string& val) { + if (keys_.find(key) != keys_.end() || val.size() == 0) return false; - val->clear(); - for(int i=0;ipush_back(buf_[offset_+i]); - offset_+=vallen; + int size = key.size() + val.size() + 2*sizeof(size_t); + if (bufsize_ + size > capacity_) { + fdat_.write(buf_, bufsize_); + bufsize_ = 0; + CHECK_LE(size, capacity_) << "Tuple size is larger than capacity " + << "Try a larger capacity size"; + } + *reinterpret_cast(buf_ + bufsize_) = key.size(); + bufsize_ += sizeof(size_t); + memcpy(buf_ + bufsize_, key.data(), key.size()); + bufsize_ += key.size(); + *reinterpret_cast(buf_ + bufsize_) = val.size(); + bufsize_ += sizeof(size_t); + memcpy(buf_ + bufsize_, val.data(), val.size()); + bufsize_ += val.size(); return true; } -void DataShard::SeekToFirst(){ +void DataShard::SeekToFirst() { CHECK_EQ(mode_, kRead); - bufsize_=0; - offset_=0; + bufsize_ = 0; + offset_ = 0; fdat_.close(); - fdat_.open(path_, std::ios::in|std::ios::binary); - CHECK(fdat_.is_open())<<"Cannot create file "<bufsize_){ - bufsize_-=offset_; - CHECK_LE(bufsize_, offset_); - for(int i=0;i(&len), sizeof(len)); - if(fin.good()) - fin.seekg(len, std::ios_base::cur); - else break; - if(fin.good()) - fin.read(reinterpret_cast(&len), sizeof(len)); - else break; - if(fin.good()) - fin.seekg(len, std::ios_base::cur); - else break; - if(!fin.good()) - break; + if (!fin.good()) break; + fin.seekg(len, std::ios_base::cur); + if (!fin.good()) break; + fin.read(reinterpret_cast(&len), sizeof(len)); + if (!fin.good()) break; + fin.seekg(len, std::ios_base::cur); + if (!fin.good()) break; count++; } fin.close(); return count; } -int DataShard::PrepareForAppend(std::string path){ - std::ifstream fin(path, std::ios::in|std::ios::binary); - if(!fin.is_open()){ - fdat_.open(path, std::ios::out|std::ios::binary); - fdat_.flush(); - fdat_.close(); - return 0; - } +int DataShard::Next(std::string *key) { + key->clear(); + int ssize = sizeof(size_t); + if (!PrepareNextField(ssize)) return 0; + int keylen = *reinterpret_cast(buf_ + offset_); + offset_ += ssize; + if (!PrepareNextField(keylen)) return 0; + for (int i = 0; i < keylen; ++i) + key->push_back(buf_[offset_ + i]); + offset_ += keylen; + if (!PrepareNextField(ssize)) return 0; + int vallen = *reinterpret_cast(buf_ + offset_); + offset_ += ssize; + if (!PrepareNextField(vallen)) return 0; + return vallen; +} - int last_tuple_offset=0; +int DataShard::PrepareForAppend(const std::string& path) { + std::ifstream fin(path, std::ios::in | std::ios::binary); + if (!fin.is_open()) return 0; + int last_tuple_offset = 0; char buf[256]; size_t len; - while(true){ - memset(buf, 0, 256); + while (true) { + fin.read(reinterpret_cast(&len), sizeof(len)); + if (!fin.good()) break; + fin.read(buf, len); + buf[len] = '\0'; + if (!fin.good()) break; fin.read(reinterpret_cast(&len), sizeof(len)); - if(fin.good()) - fin.read(buf, len); - else break; - if(fin.good()) - fin.read(reinterpret_cast(&len), sizeof(len)); - else break; - if(fin.good()) - fin.seekg(len, std::ios_base::cur); - else break; - if(fin.good()) - keys_.insert(std::string(buf)); - else break; - last_tuple_offset=fin.tellg(); + if (!fin.good()) break; + fin.seekg(len, std::ios_base::cur); + if (!fin.good()) break; + keys_.insert(std::string(buf)); + last_tuple_offset = fin.tellg(); } fin.close(); return last_tuple_offset; } -} /* singa */ + +// if the buf does not have the next complete field, read data from disk +bool DataShard::PrepareNextField(int size) { + if (offset_ + size > bufsize_) { + bufsize_ -= offset_; + // wangsh: commented, not sure what this check does + // CHECK_LE(bufsize_, offset_); + for (int i = 0; i < bufsize_; ++i) + buf_[i] = buf_[i + offset_]; + offset_ = 0; + if (fdat_.eof()) { + return false; + } else { + fdat_.read(buf_ + bufsize_, capacity_ - bufsize_); + bufsize_ += fdat_.gcount(); + if (size > bufsize_) return false; + } + } + return true; +} + +} // namespace singa