singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [06/18] incubator-singa git commit: SINGA-21 Code review
Date Wed, 24 Jun 2015 13:35:49 GMT
SINGA-21 Code review

review data_shard.h, data_shard.cc
  -- refine Next() functions for reading data shards
  -- reformat

add unit test for data shard


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/28ac5098
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/28ac5098
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/28ac5098

Branch: refs/heads/master
Commit: 28ac509830adda0b26d09526fa4a1e2c2e82a07a
Parents: aefc2d4
Author: wang sheng <wangsheng1001@gmail.com>
Authored: Wed Jun 24 15:23:43 2015 +0800
Committer: wang wei <wangwei@comp.nus.edu.sg>
Committed: Wed Jun 24 17:06:54 2015 +0800

----------------------------------------------------------------------
 include/utils/data_shard.h |  99 +++++++------
 src/test/test_cluster.cc   |  30 ++--
 src/test/test_shard.cc     |  64 +++++++++
 src/utils/cluster_rt.cc    |   2 +-
 src/utils/data_shard.cc    | 308 +++++++++++++++++++---------------------
 5 files changed, 274 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/include/utils/data_shard.h
----------------------------------------------------------------------
diff --git a/include/utils/data_shard.h b/include/utils/data_shard.h
index 2ebade9..4156ab6 100644
--- a/include/utils/data_shard.h
+++ b/include/utils/data_shard.h
@@ -1,14 +1,11 @@
-#ifndef INCLUDE_UTILS_SHARD_H_
-#define INCLUDE_UTILS_SHARD_H_
+#ifndef SINGA_UTILS_DATA_SHARD_H_
+#define SINGA_UTILS_DATA_SHARD_H_
 
 #include <google/protobuf/message.h>
 #include <fstream>
 #include <string>
 #include <unordered_set>
 
-
-using google::protobuf::Message;
-
 namespace singa {
 
 /**
@@ -22,8 +19,8 @@ namespace singa {
  * encoded as [key_len key record_len val] (key_len and record_len are of type
  * uint32, which indicate the bytes of key and record respectively.
  *
- * When Shard obj is created, it will remove the last key if the record size and
- * key size do not match because the last write of tuple crashed.
+ * When Shard obj is created, it will remove the last key if the record size
+ * and key size do not match because the last write of tuple crashed.
  *
  * TODO
  * 1. split one shard into multile shards.
@@ -33,54 +30,58 @@ namespace singa {
 class DataShard {
  public:
   enum {
-    //!< read only mode used in training
-    kRead=0,
-    //!< write mode used in creating shard (will overwrite previous one)
-    kCreate=1,
-    //!< append mode, e.g. used when previous creating crashes
-    kAppend=2
+    // read only mode used in training
+    kRead = 0,
+    // write mode used in creating shard (will overwrite previous one)
+    kCreate = 1,
+    // append mode, e.g. used when previous creating crashes
+    kAppend = 2
   };
 
- public:
   /**
    * Init the shard obj.
-   * @folder shard folder (path excluding shard.dat) on worker node
-   * @mode shard open mode, Shard::kRead, Shard::kWrite or Shard::kAppend
-   * @bufsize batch bufsize bytes data for every disk op (read or write),
+   *
+   * @param folder Shard folder (path excluding shard.dat) on worker node
+   * @param mode Shard open mode, Shard::kRead, Shard::kWrite or Shard::kAppend
+   * @param bufsize Batch bufsize bytes data for every disk op (read or write),
    * default is 100MB
    */
-  DataShard(std::string folder, char mode, int capacity=104857600);
+  DataShard(const std::string& folder, int mode);
+  DataShard(const std::string& folder, int mode, int capacity);
   ~DataShard();
 
   /**
    * read next tuple from the shard.
-   * @key key
-   * @param val record of type Message
-   * @return true if read success otherwise false, e.g., the tuple was not
-   * inserted completely.
+   * 
+   * @param key Tuple key
+   * @param val Record of type Message
+   * @return false if read unsuccess, e.g., the tuple was not inserted
+   *         completely.
    */
-  bool Next(std::string *key, Message* val);
+  bool Next(std::string* key, google::protobuf::Message* val);
   /**
    * read next tuple from the shard.
-   * @key key tuple key
-   * @param val record of type string
-   * @return true if read success otherwise false, e.g., the tuple was not
-   * inserted completely.
+   * 
+   * @param key Tuple key
+   * @param val Record of type string
+   * @return false if read unsuccess, e.g., the tuple was not inserted
+   *         completely.
    */
-  bool Next(std::string *key, std::string* val);
-
+  bool Next(std::string* key, std::string* val);
   /**
    * Append one tuple to the shard.
+   * 
    * @param key e.g., image path
    * @param val
-   * @return reture if sucess, otherwise false, e.g., inserted before
+   * @return false if unsucess, e.g., inserted before
    */
-  bool Insert(const std::string& key, const Message& tuple);
+  bool Insert(const std::string& key, const google::protobuf::Message& tuple);
   /**
    * Append one tuple to the shard.
+   * 
    * @param key e.g., image path
    * @param val
-   * @return reture if sucess, otherwise false, e.g., inserted before
+   * @return false if unsucess, e.g., inserted before
    */
   bool Insert(const std::string& key, const std::string& tuple);
   /**
@@ -92,54 +93,58 @@ class DataShard {
    * Flush buffered data to disk.
    * Used only for kCreate or kAppend.
    */
-  void Flush() ;
+  void Flush();
   /**
    * Iterate through all tuples to get the num of all tuples.
+   * 
    * @return num of tuples
    */
-  const int Count();
+  int Count();
   /**
    * @return path to shard file
    */
-  const std::string path(){
-    return path_;
-  }
+  inline std::string path() { return path_; }
 
  protected:
   /**
    * Read the next key and prepare buffer for reading value.
+   * 
    * @param key
    * @return length (i.e., bytes) of value field.
    */
-  int Next(std::string *key);
+  int Next(std::string* key);
   /**
    * Setup the disk pointer to the right position for append in case that
    * the pervious write crashes.
+   * 
    * @param path shard path.
    * @return offset (end pos) of the last success written record.
    */
-  int PrepareForAppend(std::string path);
+  int PrepareForAppend(const std::string& path);
   /**
    * Read data from disk if the current data in the buffer is not a full field.
+   * 
    * @param size size of the next field.
    */
   bool PrepareNextField(int size);
 
  private:
-  char mode_;
-  std::string path_;
+  char mode_ = 0;
+  std::string path_ = "";
   // either ifstream or ofstream
   std::fstream fdat_;
   // to avoid replicated record
   std::unordered_set<std::string> keys_;
   // internal buffer
-  char* buf_;
+  char* buf_ = nullptr;
   // offset inside the buf_
-  int offset_;
+  int offset_ = 0;
   // allocated bytes for the buf_
-  int capacity_;
+  int capacity_ = 0;
   // bytes in buf_, used in reading
-  int bufsize_;
+  int bufsize_ = 0;
 };
-} /* singa */
-#endif  // INCLUDE_UTILS_SHARD_H_
+
+}  // namespace singa
+
+#endif  // SINGA_UTILS_DATA_SHARD_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/src/test/test_cluster.cc
----------------------------------------------------------------------
diff --git a/src/test/test_cluster.cc b/src/test/test_cluster.cc
index b16d765..c34dd0f 100644
--- a/src/test/test_cluster.cc
+++ b/src/test/test_cluster.cc
@@ -1,46 +1,36 @@
-#include <fstream>
 #include "gtest/gtest.h"
 #include "proto/cluster.pb.h"
 #include "utils/cluster.h"
 
 using namespace singa;
 
-//string folder="src/test/data/";
+std::string host = "localhost:2181";
 
-string host="localhost:2181";
-
-void zk_cb(void *contest){
-  LOG(INFO) << "zk callback: " << (char *)contest;
+void zk_cb(void *contest) {
+  LOG(INFO) << "zk callback: " << static_cast<char *>(contest);
 }
 
-TEST(CluserRuntimeTest, GroupManagement){
+TEST(CluserRuntimeTest, GroupManagement) {
   ClusterRuntime* rt = new ZKClusterRT(host);
   ASSERT_EQ(rt->Init(), true);
-  
-  ASSERT_EQ(rt->sWatchSGroup(1, 1, zk_cb, "test call back"), true);
-  
-  ASSERT_EQ(rt->wJoinSGroup(1, 1, 1), true);
-  ASSERT_EQ(rt->wJoinSGroup(1, 2, 1), true);
-  
-  ASSERT_EQ(rt->wLeaveSGroup(1, 2, 1), true);
-  ASSERT_EQ(rt->wLeaveSGroup(1, 1, 1), true);
-  
+  ASSERT_EQ(rt->WatchSGroup(1, 1, zk_cb, "test call back"), true);
+  ASSERT_EQ(rt->JoinSGroup(1, 1, 1), true);
+  ASSERT_EQ(rt->JoinSGroup(1, 2, 1), true);
+  ASSERT_EQ(rt->LeaveSGroup(1, 2, 1), true);
+  ASSERT_EQ(rt->LeaveSGroup(1, 1, 1), true);
   sleep(3);
   delete rt;
 }
 
-TEST(CluserRuntimeTest, ProcessManagement){
+TEST(CluserRuntimeTest, ProcessManagement) {
   ClusterRuntime* rt = new ZKClusterRT(host);
   ASSERT_EQ(rt->Init(), true);
-  
   ASSERT_EQ(rt->RegistProc("1.2.3.4:5"), 0);
   ASSERT_EQ(rt->RegistProc("1.2.3.4:6"), 1);
   ASSERT_EQ(rt->RegistProc("1.2.3.4:7"), 2);
-
   ASSERT_NE(rt->GetProcHost(0), "");
   ASSERT_NE(rt->GetProcHost(1), "");
   ASSERT_NE(rt->GetProcHost(2), "");
-
   sleep(3);
   delete rt;
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/src/test/test_shard.cc
----------------------------------------------------------------------
diff --git a/src/test/test_shard.cc b/src/test/test_shard.cc
new file mode 100644
index 0000000..6fe478e
--- /dev/null
+++ b/src/test/test_shard.cc
@@ -0,0 +1,64 @@
+#include <sys/stat.h>
+
+#include "gtest/gtest.h"
+#include "utils/data_shard.h"
+
+std::string key[] = {"firstkey",
+                     "secondkey",
+                     "3key",
+                     "key4",
+                     "key5"};
+std::string tuple[] = {"firsttuple",
+                       "2th-tuple",
+                       "thridtuple",
+                       "tuple4",
+                       "tuple5"};
+
+using namespace singa;
+
+TEST(DataShardTest, CreateDataShard) {
+  std::string path = "src/test/data/shard_test";
+  mkdir(path.c_str(), 0755);
+  DataShard shard(path, DataShard::kCreate, 50);
+  shard.Insert(key[0], tuple[0]);
+  shard.Insert(key[1], tuple[1]);
+  shard.Insert(key[2], tuple[2]);
+  shard.Flush();
+}
+
+TEST(DataShardTest, AppendDataShard) {
+  std::string path = "src/test/data/shard_test";
+  DataShard shard(path, DataShard::kAppend, 50);
+  shard.Insert(key[3], tuple[3]);
+  shard.Insert(key[4], tuple[4]);
+  shard.Flush();
+}
+
+TEST(DataShardTest, CountDataShard) {
+  std::string path = "src/test/data/shard_test";
+  DataShard shard(path, DataShard::kRead, 50);
+  int count = shard.Count();
+  ASSERT_EQ(5, count);
+}
+
+TEST(DataShardTest, ReadDataShard) {
+  std::string path = "src/test/data/shard_test";
+  DataShard shard(path, DataShard::kRead, 50);
+  std::string k, t;
+  ASSERT_TRUE(shard.Next(&k, &t));
+  ASSERT_STREQ(key[0].c_str(), k.c_str());
+  ASSERT_STREQ(tuple[0].c_str(), t.c_str());
+  ASSERT_TRUE(shard.Next(&k, &t));
+  ASSERT_STREQ(key[1].c_str(), k.c_str());
+  ASSERT_STREQ(tuple[1].c_str(), t.c_str());
+  ASSERT_TRUE(shard.Next(&k, &t));
+  ASSERT_TRUE(shard.Next(&k, &t));
+  ASSERT_TRUE(shard.Next(&k, &t));
+  ASSERT_STREQ(key[4].c_str(), k.c_str());
+  ASSERT_STREQ(tuple[4].c_str(), t.c_str());
+  ASSERT_FALSE(shard.Next(&k, &t));
+  shard.SeekToFirst();
+  ASSERT_TRUE(shard.Next(&k, &t));
+  ASSERT_STREQ(key[0].c_str(), k.c_str());
+  ASSERT_STREQ(tuple[0].c_str(), t.c_str());
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 748a261..bedf714 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -183,7 +183,7 @@ bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int
flag,
   }
   // copy the node name ot output
   if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) {
-    snprintf(output, strlen(buf), "%s", buf);
+    strcpy(output, buf);
   }
   if (ret == ZOK) {
     LOG(INFO) << "created zookeeper node " << buf

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/28ac5098/src/utils/data_shard.cc
----------------------------------------------------------------------
diff --git a/src/utils/data_shard.cc b/src/utils/data_shard.cc
index df311e1..1dc61d2 100644
--- a/src/utils/data_shard.cc
+++ b/src/utils/data_shard.cc
@@ -1,207 +1,193 @@
-#include <sys/stat.h>
+#include "utils/data_shard.h"
+
 #include <glog/logging.h>
+#include <sys/stat.h>
 
-#include "utils/data_shard.h"
 namespace singa {
 
-DataShard::DataShard(std::string folder, char mode, int capacity){
-  struct stat sb;
-  if(stat(folder.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)){
-    LOG(INFO)<<"Open shard folder "<<folder;
-  }else{
-    LOG(FATAL)<<"Cannot open shard folder "<<folder;
-  }
+DataShard::DataShard(const std::string& folder, int mode)
+    : DataShard(folder, mode , 104857600) {}
 
-  path_= folder+"/shard.dat";
-  if(mode==DataShard::kRead){
-    fdat_.open(path_, std::ios::in|std::ios::binary);
-    CHECK(fdat_.is_open())<<"Cannot create file "<<path_;
-  }
-  if(mode==DataShard::kCreate){
-    fdat_.open(path_, std::ios::binary|std::ios::out|std::ios::trunc);
-    CHECK(fdat_.is_open())<<"Cannot create file "<<path_;
+DataShard::DataShard(const std::string& folder, int mode, int capacity) {
+  struct stat sb;
+  if (stat(folder.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) {
+    LOG(INFO) << "Open shard folder " << folder;
+  } else {
+    LOG(FATAL) << "Cannot open shard folder " << folder;
   }
-  if(mode==DataShard::kAppend){
-    int last_tuple=PrepareForAppend(path_);
-    fdat_.open(path_, std::ios::binary|std::ios::out|std::ios::in|std::ios::ate);
-    CHECK(fdat_.is_open())<<"Cannot create file "<<path_;
-    fdat_.seekp(last_tuple);
+  path_ = folder + "/shard.dat";
+  switch (mode) {
+    case DataShard::kRead: {
+      fdat_.open(path_, std::ios::in | std::ios::binary);
+      CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+      break;
+    }
+    case DataShard::kCreate: {
+      fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc);
+      CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+      break;
+    }
+    case DataShard::kAppend: {
+      int last_tuple = PrepareForAppend(path_);
+      fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::in
+                 | std::ios::ate);
+      CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+      fdat_.seekp(last_tuple);
+      break;
+    }
   }
-
-  mode_=mode;
-  offset_=0;
-  bufsize_=0;
-  capacity_=capacity;
-  buf_=new char[capacity];
+  mode_ = mode;
+  offset_ = 0;
+  bufsize_ = 0;
+  capacity_ = capacity;
+  buf_ = new char[capacity];
 }
 
-DataShard::~DataShard(){
+DataShard::~DataShard() {
   delete buf_;
   fdat_.close();
 }
 
-bool DataShard::Insert(const std::string& key, const Message& val) {
-  std::string str;
-  val.SerializeToString(&str);
-  return Insert(key, str);
-}
-// insert one complete tuple
-bool DataShard::Insert(const std::string& key, const std::string& val) {
-  if(keys_.find(key)!=keys_.end()||val.size()==0)
-    return false;
-  int size=key.size()+val.size()+2*sizeof(size_t);
-  if(offset_+size>capacity_){
-    fdat_.write(buf_, offset_);
-    offset_=0;
-    CHECK_LE(size, capacity_)<<"Tuple size is larger than capacity"
-      <<"Try a larger capacity size";
-  }
-  *reinterpret_cast<size_t*>(buf_+offset_)=key.size();
-  offset_+=sizeof(size_t);
-  memcpy(buf_+offset_, key.data(), key.size());
-  offset_+=key.size();
-  *reinterpret_cast<size_t*>(buf_+offset_)=val.size();
-  offset_+=sizeof(size_t);
-  memcpy(buf_+offset_, val.data(), val.size());
-  offset_+=val.size();
+bool DataShard::Next(std::string* key, google::protobuf::Message* val) {
+  int vallen = Next(key);
+  if (vallen == 0) return false;
+  val->ParseFromArray(buf_ + offset_, vallen);
+  offset_ += vallen;
   return true;
 }
 
-void DataShard::Flush() {
-  fdat_.write(buf_, offset_);
-  fdat_.flush();
-  offset_=0;
-}
-
-int DataShard::Next(std::string *key){
-  key->clear();
-  int ssize=sizeof(size_t);
-  if(!PrepareNextField(ssize))
-    return 0;
-  CHECK_LE(offset_+ssize, bufsize_);
-  int keylen=*reinterpret_cast<size_t*>(buf_+offset_);
-  offset_+=ssize;
-
-  if(!PrepareNextField(keylen))
-    return 0;
-  CHECK_LE(offset_+keylen, bufsize_);
-  for(int i=0;i<keylen;i++)
-    key->push_back(buf_[offset_+i]);
-  offset_+=keylen;
-
-  if(!PrepareNextField(ssize))
-    return 0;
-  CHECK_LE(offset_+ssize, bufsize_);
-  int vallen=*reinterpret_cast<size_t*>(buf_+offset_);
-  offset_+=ssize;
-
-  if(!PrepareNextField(vallen))
-    return 0;
-  CHECK_LE(offset_+vallen, bufsize_);
-  return vallen;
+bool DataShard::Next(std::string *key, std::string* val) {
+  int vallen = Next(key);
+  if (vallen == 0) return false;
+  val->clear();
+  for (int i = 0; i < vallen; ++i)
+    val->push_back(buf_[offset_ + i]);
+  offset_ += vallen;
+  return true;
 }
 
-bool DataShard::Next(std::string *key, Message* val) {
-  int vallen=Next(key);
-  if(vallen==0)
-    return false;
-  val->ParseFromArray(buf_+offset_, vallen);
-  offset_+=vallen;
-  return true;
+bool DataShard::Insert(const std::string& key,
+                       const google::protobuf::Message& val) {
+  std::string str;
+  val.SerializeToString(&str);
+  return Insert(key, str);
 }
 
-bool DataShard::Next(std::string *key, std::string* val) {
-  int vallen=Next(key);
-  if(vallen==0)
+// insert one complete tuple
+bool DataShard::Insert(const std::string& key, const std::string& val) {
+  if (keys_.find(key) != keys_.end() || val.size() == 0)
     return false;
-  val->clear();
-  for(int i=0;i<vallen;i++)
-    val->push_back(buf_[offset_+i]);
-  offset_+=vallen;
+  int size = key.size() + val.size() + 2*sizeof(size_t);
+  if (bufsize_ + size > capacity_) {
+    fdat_.write(buf_, bufsize_);
+    bufsize_ = 0;
+    CHECK_LE(size, capacity_) << "Tuple size is larger than capacity "
+      << "Try a larger capacity size";
+  }
+  *reinterpret_cast<size_t*>(buf_ + bufsize_) = key.size();
+  bufsize_ += sizeof(size_t);
+  memcpy(buf_ + bufsize_, key.data(), key.size());
+  bufsize_ += key.size();
+  *reinterpret_cast<size_t*>(buf_ + bufsize_) = val.size();
+  bufsize_ += sizeof(size_t);
+  memcpy(buf_ + bufsize_, val.data(), val.size());
+  bufsize_ += val.size();
   return true;
 }
 
-void DataShard::SeekToFirst(){
+void DataShard::SeekToFirst() {
   CHECK_EQ(mode_, kRead);
-  bufsize_=0;
-  offset_=0;
+  bufsize_ = 0;
+  offset_ = 0;
   fdat_.close();
-  fdat_.open(path_, std::ios::in|std::ios::binary);
-  CHECK(fdat_.is_open())<<"Cannot create file "<<path_;
+  fdat_.open(path_, std::ios::in | std::ios::binary);
+  CHECK(fdat_.is_open()) << "Cannot create file " << path_;
 }
 
-// if the buf does not have the next complete field, read data from disk
-bool DataShard::PrepareNextField(int size){
-  if(offset_+size>bufsize_){
-    bufsize_-=offset_;
-    CHECK_LE(bufsize_, offset_);
-    for(int i=0;i<bufsize_;i++)
-      buf_[i]=buf_[i+offset_];
-    offset_=0;
-    if(fdat_.eof())
-      return false;
-    else{
-      fdat_.read(buf_+bufsize_, capacity_-bufsize_);
-      bufsize_+=fdat_.gcount();
-    }
-  }
-  return true;
+void DataShard::Flush() {
+  fdat_.write(buf_, bufsize_);
+  fdat_.flush();
+  bufsize_ = 0;
 }
 
-const int DataShard::Count() {
-  std::ifstream fin(path_, std::ios::in|std::ios::binary);
-  CHECK(fdat_.is_open())<<"Cannot create file "<<path_;
-  int count=0;
-  while(true){
+int DataShard::Count() {
+  std::ifstream fin(path_, std::ios::in | std::ios::binary);
+  CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+  int count = 0;
+  while (true) {
     size_t len;
     fin.read(reinterpret_cast<char*>(&len), sizeof(len));
-    if(fin.good())
-      fin.seekg(len, std::ios_base::cur);
-    else break;
-    if(fin.good())
-      fin.read(reinterpret_cast<char*>(&len), sizeof(len));
-    else break;
-    if(fin.good())
-      fin.seekg(len, std::ios_base::cur);
-    else break;
-    if(!fin.good())
-      break;
+    if (!fin.good()) break;
+    fin.seekg(len, std::ios_base::cur);
+    if (!fin.good()) break;
+    fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+    if (!fin.good()) break;
+    fin.seekg(len, std::ios_base::cur);
+    if (!fin.good()) break;
     count++;
   }
   fin.close();
   return count;
 }
 
-int DataShard::PrepareForAppend(std::string path){
-  std::ifstream fin(path, std::ios::in|std::ios::binary);
-  if(!fin.is_open()){
-    fdat_.open(path, std::ios::out|std::ios::binary);
-    fdat_.flush();
-    fdat_.close();
-    return 0;
-  }
+int DataShard::Next(std::string *key) {
+  key->clear();
+  int ssize = sizeof(size_t);
+  if (!PrepareNextField(ssize)) return 0;
+  int keylen = *reinterpret_cast<size_t*>(buf_ + offset_);
+  offset_ += ssize;
+  if (!PrepareNextField(keylen)) return 0;
+  for (int i = 0; i < keylen; ++i)
+    key->push_back(buf_[offset_ + i]);
+  offset_ += keylen;
+  if (!PrepareNextField(ssize)) return 0;
+  int vallen = *reinterpret_cast<size_t*>(buf_ + offset_);
+  offset_ += ssize;
+  if (!PrepareNextField(vallen)) return 0;
+  return vallen;
+}
 
-  int last_tuple_offset=0;
+int DataShard::PrepareForAppend(const std::string& path) {
+  std::ifstream fin(path, std::ios::in | std::ios::binary);
+  if (!fin.is_open()) return 0;
+  int last_tuple_offset = 0;
   char buf[256];
   size_t len;
-  while(true){
-    memset(buf, 0, 256);
+  while (true) {
+    fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+    if (!fin.good()) break;
+    fin.read(buf, len);
+    buf[len] = '\0';
+    if (!fin.good()) break;
     fin.read(reinterpret_cast<char*>(&len), sizeof(len));
-    if(fin.good())
-      fin.read(buf, len);
-    else break;
-    if(fin.good())
-      fin.read(reinterpret_cast<char*>(&len), sizeof(len));
-    else break;
-    if(fin.good())
-      fin.seekg(len, std::ios_base::cur);
-    else break;
-    if(fin.good())
-      keys_.insert(std::string(buf));
-    else break;
-    last_tuple_offset=fin.tellg();
+    if (!fin.good()) break;
+    fin.seekg(len, std::ios_base::cur);
+    if (!fin.good()) break;
+    keys_.insert(std::string(buf));
+    last_tuple_offset = fin.tellg();
   }
   fin.close();
   return last_tuple_offset;
 }
-} /* singa */
+
+// if the buf does not have the next complete field, read data from disk
+bool DataShard::PrepareNextField(int size) {
+  if (offset_ + size > bufsize_) {
+    bufsize_ -= offset_;
+    // wangsh: commented, not sure what this check does
+    // CHECK_LE(bufsize_, offset_);
+    for (int i = 0; i < bufsize_; ++i)
+      buf_[i] = buf_[i + offset_];
+    offset_ = 0;
+    if (fdat_.eof()) {
+      return false;
+    } else {
+      fdat_.read(buf_ + bufsize_, capacity_ - bufsize_);
+      bufsize_ += fdat_.gcount();
+      if (size > bufsize_) return false;
+    }
+  }
+  return true;
+}
+
+}  // namespace singa


Mime
View raw message