singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [1/5] incubator-singa git commit: SINGA-82 Refactor input layers using data store abstraction
Date Wed, 07 Oct 2015 07:26:08 GMT
Repository: incubator-singa
Updated Branches:
  refs/heads/master 958dfebbe -> dc7f1996d


SINGA-82 Refactor input layers using data store abstraction

Add Store abstraction for read (writing data). Implemented two backend,

1. KVFile, which was named DataShard. It is a binary file, each tuple
has a unique key.
2. TextFile, which is a plain text file with each line be the value
field of a tuple (the key is the line No.).

TODO, implment HDFS and image folder as the backend.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/d99b24cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/d99b24cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/d99b24cb

Branch: refs/heads/master
Commit: d99b24cb75def9fdbdc59273c4297abb75813c36
Parents: 958dfeb
Author: Wei Wang <wangwei@comp.nus.edu.sg>
Authored: Mon Oct 5 19:26:45 2015 +0800
Committer: wang sheng <wangsheng1001@gmail.com>
Committed: Wed Oct 7 15:19:58 2015 +0800

----------------------------------------------------------------------
 Makefile.am                    |  13 ++-
 include/io/hdfs_store.h        |  22 ++++
 include/io/imagefolder_store.h |  21 ++++
 include/io/kvfile.h            | 182 ++++++++++++++++++++++++++++++
 include/io/kvfile_store.h      |  53 +++++++++
 include/io/store.h             |  79 +++++++++++++
 include/io/textfile_store.h    |  49 ++++++++
 src/io/kvfile.cc               | 217 ++++++++++++++++++++++++++++++++++++
 src/io/kvfile_store.cc         |  71 ++++++++++++
 src/io/store.cc                |  57 ++++++++++
 src/io/textfile_store.cc       |  83 ++++++++++++++
 src/test/test_store.cc         |  92 +++++++++++++++
 12 files changed, 937 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/Makefile.am
----------------------------------------------------------------------
diff --git a/Makefile.am b/Makefile.am
index 77d6ded..f8e765d 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -46,7 +46,11 @@ SINGA_SRCS := src/driver.cc \
               src/neuralnet/output_layer.cc \
               src/neuralnet/neuralnet.cc \
               src/comm/socket.cc \
-              src/comm/msg.cc
+              src/comm/msg.cc \
+							src/io/kvfile.cc \
+							src/io/kvfile_store.cc \
+							src/io/textfile_store.cc \
+							src/io/store.cc
 
 SINGA_HDRS := include/singa.h \
               include/utils/cluster.h \
@@ -80,6 +84,10 @@ SINGA_HDRS := include/singa.h \
               include/mshadow/tensor_random.h \
               include/comm/msg.h \
               include/comm/socket.h
+							src/io/store.h \
+							src/io/kvfile.h \
+							src/io/kvfile_store.h \
+							src/io/textfile_store.h
 
 GTEST_SRCS := include/gtest/gtest-all.cc
 GTEST_HRDS := include/gtest/gtest.h
@@ -89,7 +97,8 @@ TEST_SRCS := include/gtest/gtest_main.cc \
 			 src/test/test_msg.cc \
 			 src/test/test_neuralnet.cc \
 			 src/test/test_paramslicer.cc \
-			 src/test/test_shard.cc
+			 src/test/test_shard.cc \
+			 src/test/test_store.cc
 
 #EXTRA_PROGRAMS = $(PROGS)
 EXTRA_PROGRAMS = singatest

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/hdfs_store.h
----------------------------------------------------------------------
diff --git a/include/io/hdfs_store.h b/include/io/hdfs_store.h
new file mode 100644
index 0000000..f85615b
--- /dev/null
+++ b/include/io/hdfs_store.h
@@ -0,0 +1,22 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+// TODO(wangwei) use hdfs as data storage

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/imagefolder_store.h
----------------------------------------------------------------------
diff --git a/include/io/imagefolder_store.h b/include/io/imagefolder_store.h
new file mode 100644
index 0000000..c05d92d
--- /dev/null
+++ b/include/io/imagefolder_store.h
@@ -0,0 +1,21 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+// TODO(wangwei) store images in a disk folder

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/kvfile.h
----------------------------------------------------------------------
diff --git a/include/io/kvfile.h b/include/io/kvfile.h
new file mode 100644
index 0000000..d70f198
--- /dev/null
+++ b/include/io/kvfile.h
@@ -0,0 +1,182 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#ifndef SINGA_IO_KVFILE_H_
+#define SINGA_IO_KVFILE_H_
+
+#include <fstream>
+#include <string>
+#include <unordered_set>
+
+#define USE_PROTOBUF 1
+
+#ifdef USE_PROTOBUF
+#include <google/protobuf/message.h>
+#endif
+
+namespace singa { namespace io {
+
+
+/**
+ * KVFile stores training/validation/test tuples.
+ * Every worker node should have a KVFile for training data (validation/test
+ * KVFile is optional).
+ * KVFile consists of a set of unordered tuples. Each tuple is
+ * encoded as [key_len key val_len val] (key_len and val_len are of type
+ * uint32, which indicate the bytes of key and value respectively.
+ *
+ * When KVFile is created, it will remove the last tuple if the value size
+ * and key size do not match because the last write crashed.
+ *
+ * TODO(wangwei) split one KVFile into multiple KVFile s.
+ *
+ */
+class KVFile {
+ public:
+  enum Mode {
+    // read only mode used in training
+    kRead = 0,
+    // write mode used in creating KVFile (will overwrite previous one)
+    kCreate = 1,
+    // append mode, e.g. used when previous creating crashes
+    kAppend = 2
+  };
+
+  /**
+   * KVFile constructor.
+   *
+   * @param path path to the disk KVFile, it can be
+   *  - a path to local disk file.
+   *  - a path to local directory. This is to be compatible with the older
+   *    version (DataShard). The KVFile is shard.dat under that directory
+   *  - a hdfs file starting with "hdfs://"
+   * @param mode KVFile open mode, KVFile::kRead, KVFile::kWrite or
+   * KVFile::kAppend
+   * @param bufsize Cache bufsize bytes data for every disk op (read or write),
+   * default is 10MB.
+   */
+  KVFile(const std::string& path, Mode mode, int bufsize = 10485760);
+  ~KVFile();
+
+#ifdef USE_PROTOBUF
+  /**
+   * read next tuple from the KVFile.
+   *
+   * @param key Tuple key
+   * @param val Record of type Message
+   * @return false if read unsuccess, e.g., the tuple was not inserted
+   *         completely.
+   */
+  bool Next(std::string* key, google::protobuf::Message* val);
+  /**
+   * Append one tuple to the KVFile.
+   *
+   * @param key e.g., image path
+   * @param val
+   * @return false if unsucess, e.g., inserted before
+   */
+  bool Insert(const std::string& key, const google::protobuf::Message& tuple);
+#endif
+  /**
+   * read next tuple from the KVFile.
+   *
+   * @param key Tuple key
+   * @param val Record of type string
+   * @return false if unsuccess, e.g. the tuple was not inserted completely.
+   */
+  bool Next(std::string* key, std::string* val);
+  /**
+   * Append one tuple to the KVFile.
+   *
+   * @param key e.g., image path
+   * @param val
+   * @return false if unsucess, e.g., inserted before
+   */
+  bool Insert(const std::string& key, const std::string& tuple);
+  /**
+   * Move the read pointer to the head of the KVFile file.
+   * Used for repeated reading.
+   */
+  void SeekToFirst();
+  /**
+   * Flush buffered data to disk.
+   * Used only for kCreate or kAppend.
+   */
+  void Flush();
+  /**
+   * Iterate through all tuples to get the num of all tuples.
+   *
+   * @return num of tuples
+   */
+  int Count();
+  /**
+   * @return path to KVFile file
+   */
+  inline std::string path() { return path_; }
+
+ protected:
+  /**
+   * Read the next key and prepare buffer for reading value.
+   *
+   * @param key
+   * @return length (i.e., bytes) of value field.
+   */
+  int Next(std::string* key);
+  /**
+   * Setup the disk pointer to the right position for append in case that
+   * the pervious write crashes.
+   *
+   * @param path KVFile path.
+   * @return offset (end pos) of the last success written record.
+   */
+  int PrepareForAppend(const std::string& path);
+  /**
+   * Read data from disk if the current data in the buffer is not a full field.
+   *
+   * @param size size of the next field.
+   */
+  bool PrepareNextField(int size);
+
+ private:
+  std::string path_ = "";
+  Mode mode_;
+  //!< either ifstream or ofstream
+  std::fstream fdat_;
+  //!< to avoid replicated record
+  std::unordered_set<std::string> keys_;
+  //!< internal buffer
+  char* buf_ = nullptr;
+  //!< offset inside the buf_
+  int offset_ = 0;
+  //!< allocated bytes for the buf_
+  int capacity_ = 0;
+  //!< bytes in buf_, used in reading
+  int bufsize_ = 0;
+};
+} /* io */
+
+/**
+ * @deprecated {ShardData is deprecated! Use KVFile}.
+using ShardData = KVFile;
+*/
+}  // namespace singa
+
+#endif  // SINGA_IO_KVFILE_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/kvfile_store.h
----------------------------------------------------------------------
diff --git a/include/io/kvfile_store.h b/include/io/kvfile_store.h
new file mode 100644
index 0000000..bda7409
--- /dev/null
+++ b/include/io/kvfile_store.h
@@ -0,0 +1,53 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+
+#ifndef SINGA_IO_KVFILE_STORE_H_
+#define SINGA_IO_KVFILE_STORE_H_
+
+#include <string>
+#include "io/store.h"
+#include "io/kvfile.h"
+
+namespace singa { namespace io {
+
+/**
+ * Use the KVFile as the data storage.
+ *
+ * KVFile is a binary file. Each tuple is stored as byte string.
+ */
+class KVFileStore : public Store {
+ public:
+  bool Open(const std::string& source, Mode mode) override;
+  void Close() override;
+  bool Read(std::string* key, std::string* value) override;
+  void SeekToFirst() override;
+  bool Write(const std::string& key, const std::string& value) override;
+  void Flush() override;
+
+ private:
+  KVFile* file_ = nullptr;
+  Mode mode_;
+};
+
+} /* io */
+} /* singa */
+#endif  // SINGA_IO_KVFILE_STORE_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/store.h
----------------------------------------------------------------------
diff --git a/include/io/store.h b/include/io/store.h
new file mode 100644
index 0000000..8665af0
--- /dev/null
+++ b/include/io/store.h
@@ -0,0 +1,79 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#ifndef SINGA_IO_STORE_H_
+#define SINGA_IO_STORE_H_
+
+#include <string>
+
+namespace singa { namespace io {
+using std::string;
+enum Mode { kCreate, kRead, kAppend };
+
+/**
+ * General key-value store that provides functions for reading and writing
+ * tuples.
+ *
+ * Subclasses implement the functions for a specific data storage, e.g., CSV
+ * file, HDFS, image folder, singa::io::SFile, leveldb, lmdb, etc.
+ */
+class Store {
+ public:
+  Store() { }
+  virtual ~Store() { }
+  /**
+   * @param[in] source path to the storage, could be a file path, folder path
+   * or hdfs path, or even a http url.
+   * @param[in] mode
+   * @return true if open successfully, otherwise false.
+   */
+  virtual bool Open(const std::string& source, Mode mode) = 0;
+  virtual void Close() = 0;
+  /**
+   * Read a tuple.
+   *
+   * @param[out] key
+   * @param[out] value
+   * @return true if read successfully, otherwise false.
+   */
+  virtual bool Read(std::string* key, std::string* value) = 0;
+  /**
+   * Seek the read header to the first tuple.
+   */
+  virtual void SeekToFirst() = 0;
+  /**
+   * Write a tuple.
+   *
+   * @param[in] key
+   * @param[in] value
+   * @return true if success, otherwise false.
+   */
+  virtual bool Write(const std::string& key, const std::string& value) = 0;
+  /**
+   * Flush writing buffer if it has.
+   */
+  virtual void Flush() {}
+};
+
+Store* CreateStore(const std::string& store);
+} // namespace io
+} /* singa */
+#endif  // SINGA_IO_STORE_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/include/io/textfile_store.h
----------------------------------------------------------------------
diff --git a/include/io/textfile_store.h b/include/io/textfile_store.h
new file mode 100644
index 0000000..4c020e9
--- /dev/null
+++ b/include/io/textfile_store.h
@@ -0,0 +1,49 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+
+#include <fstream>
+#include "io/store.h"
+
+namespace singa { namespace io {
+/**
+ * Use text file as the data storage, one line per tuple.
+ *
+ * It is used for storeing CSV format data where the key is the line No. and
+ * the value is the line.
+ */
+class TextFileStore : public Store {
+ public:
+  bool Open(const std::string& source, Mode mode) override;
+  void Close() override;
+  bool Read(std::string* key, std::string* value) override;
+  void SeekToFirst() override;
+  bool Write(const std::string& key, const std::string& value) override;
+  void Flush() override;
+
+ private:
+  int lineNo_ = 0;
+  std::fstream* fs_ = nullptr;
+  Mode mode_;
+};
+} /* io */
+
+} /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/io/kvfile.cc
----------------------------------------------------------------------
diff --git a/src/io/kvfile.cc b/src/io/kvfile.cc
new file mode 100644
index 0000000..aa52150
--- /dev/null
+++ b/src/io/kvfile.cc
@@ -0,0 +1,217 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include "io/kvfile.h"
+
+#include <glog/logging.h>
+#include <string>
+
+namespace singa { namespace io {
+KVFile::KVFile(const std::string& path, Mode mode, int capacity) :
+path_(path), mode_(mode), capacity_(capacity) {
+  buf_ = new char[capacity];
+  switch (mode) {
+    case KVFile::kRead:
+      fdat_.open(path_, std::ios::in | std::ios::binary);
+      if (!fdat_.is_open()) {
+        // path may be a directory
+        path_ = path + "/shard.dat";
+        fdat_.open(path_, std::ios::in | std::ios::binary);
+      }
+      CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+      break;
+    case KVFile::kCreate:
+      fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc);
+      CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+      break;
+    case KVFile::kAppend:
+      fdat_.open(path_, std::ios::in | std::ios::binary);
+      if (!fdat_.is_open()) {
+        // path may be a directory
+        path_ = path + "/shard.dat";
+        fdat_.open(path_, std::ios::in | std::ios::binary);
+      }
+      CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+      {
+        int last_tuple = PrepareForAppend(path_);
+        fdat_.open(path_, std::ios::binary | std::ios::out
+            | std::ios::in | std::ios::ate);
+        fdat_.seekp(last_tuple);
+      }
+      break;
+    default:
+      LOG(FATAL) << "unknown model to open KVFile " << mode;
+      break;
+  }
+}
+
+KVFile::~KVFile() {
+  if (mode_ != kRead)
+    Flush();
+  delete buf_;
+  fdat_.close();
+}
+#ifdef USE_PROTOBUF
+bool KVFile::Next(std::string* key, google::protobuf::Message* val) {
+  int vallen = Next(key);
+  if (vallen == 0) return false;
+  val->ParseFromArray(buf_ + offset_, vallen);
+  offset_ += vallen;
+  return true;
+}
+
+bool KVFile::Insert(const std::string& key,
+    const google::protobuf::Message& val) {
+  std::string str;
+  val.SerializeToString(&str);
+  return Insert(key, str);
+}
+#endif
+
+bool KVFile::Next(std::string *key, std::string* val) {
+  int vallen = Next(key);
+  if (vallen == 0) return false;
+  val->clear();
+  for (int i = 0; i < vallen; ++i)
+    val->push_back(buf_[offset_ + i]);
+  offset_ += vallen;
+  return true;
+}
+
+// insert one complete tuple
+bool KVFile::Insert(const std::string& key, const std::string& val) {
+  if (keys_.find(key) != keys_.end() || val.size() == 0)
+    return false;
+  int size = key.size() + val.size() + 2*sizeof(size_t);
+  if (bufsize_ + size > capacity_) {
+    fdat_.write(buf_, bufsize_);
+    bufsize_ = 0;
+    CHECK_LE(size, capacity_) << "Tuple size is larger than capacity "
+      << "Try a larger capacity size";
+  }
+  *reinterpret_cast<size_t*>(buf_ + bufsize_) = key.size();
+  bufsize_ += sizeof(size_t);
+  memcpy(buf_ + bufsize_, key.data(), key.size());
+  bufsize_ += key.size();
+  *reinterpret_cast<size_t*>(buf_ + bufsize_) = val.size();
+  bufsize_ += sizeof(size_t);
+  memcpy(buf_ + bufsize_, val.data(), val.size());
+  bufsize_ += val.size();
+  return true;
+}
+
+void KVFile::SeekToFirst() {
+  CHECK_EQ(mode_, kRead);
+  bufsize_ = 0;
+  offset_ = 0;
+  fdat_.clear();
+  fdat_.seekg(0);
+  CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+}
+
+void KVFile::Flush() {
+  fdat_.write(buf_, bufsize_);
+  fdat_.flush();
+  bufsize_ = 0;
+}
+
+int KVFile::Count() {
+  std::ifstream fin(path_, std::ios::in | std::ios::binary);
+  CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+  int count = 0;
+  while (true) {
+    size_t len;
+    fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+    if (!fin.good()) break;
+    fin.seekg(len, std::ios_base::cur);
+    if (!fin.good()) break;
+    fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+    if (!fin.good()) break;
+    fin.seekg(len, std::ios_base::cur);
+    if (!fin.good()) break;
+    count++;
+  }
+  fin.close();
+  return count;
+}
+
+int KVFile::Next(std::string *key) {
+  key->clear();
+  int ssize = sizeof(size_t);
+  if (!PrepareNextField(ssize)) return 0;
+  int keylen = *reinterpret_cast<size_t*>(buf_ + offset_);
+  offset_ += ssize;
+  if (!PrepareNextField(keylen)) return 0;
+  for (int i = 0; i < keylen; ++i)
+    key->push_back(buf_[offset_ + i]);
+  offset_ += keylen;
+  if (!PrepareNextField(ssize)) return 0;
+  int vallen = *reinterpret_cast<size_t*>(buf_ + offset_);
+  offset_ += ssize;
+  if (!PrepareNextField(vallen)) return 0;
+  return vallen;
+}
+
+int KVFile::PrepareForAppend(const std::string& path) {
+  std::ifstream fin(path, std::ios::in | std::ios::binary);
+  if (!fin.is_open()) return 0;
+  int last_tuple_offset = 0;
+  char buf[256];
+  size_t len;
+  while (true) {
+    fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+    if (!fin.good()) break;
+    fin.read(buf, len);
+    buf[len] = '\0';
+    if (!fin.good()) break;
+    fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+    if (!fin.good()) break;
+    fin.seekg(len, std::ios_base::cur);
+    if (!fin.good()) break;
+    keys_.insert(std::string(buf));
+    last_tuple_offset = fin.tellg();
+  }
+  fin.close();
+  return last_tuple_offset;
+}
+
+// if the buf does not have the next complete field, read data from disk
+bool KVFile::PrepareNextField(int size) {
+  if (offset_ + size > bufsize_) {
+    bufsize_ -= offset_;
+    // wangsh: commented, not sure what this check does
+    // CHECK_LE(bufsize_, offset_);
+    for (int i = 0; i < bufsize_; ++i)
+      buf_[i] = buf_[i + offset_];
+    offset_ = 0;
+    if (fdat_.eof()) {
+      return false;
+    } else {
+      fdat_.read(buf_ + bufsize_, capacity_ - bufsize_);
+      bufsize_ += fdat_.gcount();
+      if (size > bufsize_) return false;
+    }
+  }
+  return true;
+}
+
+} /* io */
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/io/kvfile_store.cc
----------------------------------------------------------------------
diff --git a/src/io/kvfile_store.cc b/src/io/kvfile_store.cc
new file mode 100644
index 0000000..11609bf
--- /dev/null
+++ b/src/io/kvfile_store.cc
@@ -0,0 +1,71 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+
+#include "io/kvfile_store.h"
+#include <glog/logging.h>
+
+namespace singa { namespace io {
+
+bool KVFileStore::Open(const std::string& source, Mode mode) {
+  CHECK(file_ == nullptr);
+  if (mode == kRead)
+    file_ = new KVFile(source, KVFile::kRead);
+  else if (mode == kCreate)
+    file_ = new KVFile(source, KVFile::kCreate);
+  else if (mode == kAppend)
+    file_ = new KVFile(source, KVFile::kAppend);
+  mode_ = mode;
+  return file_ != nullptr;
+}
+
+void KVFileStore::Close() {
+  if (file_ != nullptr)
+    delete file_;
+  file_ = nullptr;
+}
+
+bool KVFileStore::Read(std::string* key, std::string* value) {
+  CHECK_EQ(mode_, kRead);
+  CHECK(file_ != nullptr);
+  return file_->Next(key, value);
+}
+
+void KVFileStore::SeekToFirst() {
+  CHECK_EQ(mode_, kRead);
+  CHECK(file_ != nullptr);
+  file_->SeekToFirst();
+}
+bool KVFileStore::Write(const std::string& key, const std::string& value) {
+  CHECK_NE(mode_, kRead);
+  CHECK(file_ != nullptr);
+  return file_->Insert(key, value);
+}
+
+void KVFileStore::Flush() {
+  CHECK_NE(mode_, kRead);
+  CHECK(file_!= nullptr);
+  file_->Flush();
+}
+
+} /* io */
+
+} /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/io/store.cc
----------------------------------------------------------------------
diff --git a/src/io/store.cc b/src/io/store.cc
new file mode 100644
index 0000000..6412628
--- /dev/null
+++ b/src/io/store.cc
@@ -0,0 +1,57 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+
+#include "io/store.h"
+#include "io/kvfile_store.h"
+#include "io/textfile_store.h"
+
+namespace singa { namespace io {
+Store* CreateStore(const std::string& backend) {
+  Store *store = nullptr;
+  if (backend.compare("textfile") == 0) {
+    store = new TextFileStore();
+  } else if (backend.compare("kvfile") == 0) {
+    store = new KVFileStore();
+  }
+
+#ifdef USE_LMDB
+  if (backend == "lmdb") {
+    return new LMDBStore();
+  }
+#endif
+
+#ifdef USE_OPENCV
+  if (backend == "imagefolder") {
+    return new ImageFolderStore();
+  }
+#endif
+
+#ifdef USE_HDFS
+  if (backend == "hdfs") {
+    return new HDFSStore();
+  }
+#endif
+  return store;
+}
+} /* io */
+
+} /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/io/textfile_store.cc
----------------------------------------------------------------------
diff --git a/src/io/textfile_store.cc b/src/io/textfile_store.cc
new file mode 100644
index 0000000..74ec9a4
--- /dev/null
+++ b/src/io/textfile_store.cc
@@ -0,0 +1,83 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+
+#include "io/textfile_store.h"
+#include <glog/logging.h>
+
+namespace singa { namespace io {
+bool TextFileStore::Open(const std::string& source, Mode mode) {
+  if (mode == kRead)
+    fs_ = new std::fstream(source, std::fstream::in);
+  else if (mode == kCreate) {
+    fs_ = new std::fstream(source, std::fstream::out);
+  }
+  mode_ = mode;
+  return fs_->is_open();
+}
+
+void TextFileStore::Close() {
+  if (fs_ != nullptr) {
+    if (fs_->is_open()) {
+      fs_->close();
+    }
+    delete fs_;
+  }
+}
+
+bool TextFileStore::Read(std::string* key, std::string* value) {
+  CHECK_EQ(mode_, kRead);
+  CHECK(fs_ != nullptr);
+  CHECK(value != nullptr);
+  CHECK(key != nullptr);
+  if (!std::getline(*fs_, *value)) {
+    if (fs_->eof())
+      return false;
+    else
+      LOG(FATAL) << "error in reading csv file";
+  }
+  *key = std::to_string(lineNo_++);
+  return true;
+}
+
+void TextFileStore::SeekToFirst() {
+  CHECK_EQ(mode_, kRead);
+  CHECK(fs_ != nullptr);
+  lineNo_ = 0;
+  fs_->clear();
+  fs_->seekg(0);
+}
+
+bool TextFileStore::Write(const std::string& key, const std::string& value) {
+  CHECK_NE(mode_, kRead);
+  CHECK(fs_ != nullptr);
+  // csv store does not write key
+  *fs_ << value << '\n';
+  return true;
+}
+
+void TextFileStore::Flush() {
+  fs_->flush();
+}
+
+} /* io */
+
+} /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/d99b24cb/src/test/test_store.cc
----------------------------------------------------------------------
diff --git a/src/test/test_store.cc b/src/test/test_store.cc
new file mode 100644
index 0000000..f69aebb
--- /dev/null
+++ b/src/test/test_store.cc
@@ -0,0 +1,92 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+#include <string>
+#include "gtest/gtest.h"
+#include "io/store.h"
+
+TEST(TextFileStore, Open) {
+  auto store = singa::io::CreateStore("textfile");
+  EXPECT_EQ(store->Open("src/test/store.txt", singa::io::kCreate), true);
+  store->Close();
+  EXPECT_EQ(store->Open("src/test/store.txt", singa::io::kRead), true);
+  store->Close();
+}
+
+TEST(TextFileStore, Write) {
+  auto store = singa::io::CreateStore("textfile");
+  store->Open("src/test/store.txt", singa::io::kCreate);
+  store->Write("001", "first tuple");
+  store->Write("002", "second tuple");
+  store->Flush();
+  store->Write("003", "third tuple");
+  store->Close();
+}
+
+TEST(TextFileStore, Read) {
+  auto store = singa::io::CreateStore("textfile");
+  EXPECT_EQ(store->Open("src/test/store.txt", singa::io::kRead), true);
+  std::string key, value;
+  EXPECT_EQ(store->Read(&key, &value), true);
+  EXPECT_EQ(key, "0");
+  EXPECT_EQ(value, "first tuple");
+
+  EXPECT_EQ(store->Read(&key, &value), true);
+  EXPECT_EQ(store->Read(&key, &value), true);
+  EXPECT_EQ(store->Read(&key, &value), false);
+  store->SeekToFirst();
+
+  EXPECT_EQ(store->Read(&key, &value), true);
+  EXPECT_EQ(key, "0");
+  EXPECT_EQ(value, "first tuple");
+}
+TEST(KVFileStore, Open) {
+  auto store = singa::io::CreateStore("kvfile");
+  EXPECT_EQ(store->Open("src/test/store.bin", singa::io::kCreate), true);
+  store->Close();
+  EXPECT_EQ(store->Open("src/test/store.bin", singa::io::kRead), true);
+  store->Close();
+}
+TEST(KVFileStore, Write) {
+  auto store = singa::io::CreateStore("kvfile");
+  store->Open("src/test/store.bin", singa::io::kCreate);
+  store->Write("001", "first tuple");
+  store->Write("002", "second tuple");
+  store->Flush();
+  store->Write("003", "third tuple");
+  store->Close();
+}
+TEST(KVFileStore, Read) {
+  auto store = singa::io::CreateStore("kvfile");
+  store->Open("src/test/store.bin", singa::io::kRead);
+  std::string key, value;
+  EXPECT_EQ(store->Read(&key, &value), true);
+  EXPECT_EQ(key, "001");
+  EXPECT_EQ(value, "first tuple");
+
+  EXPECT_EQ(store->Read(&key, &value), true);
+  EXPECT_EQ(store->Read(&key, &value), true);
+  EXPECT_EQ(store->Read(&key, &value), false);
+  store->SeekToFirst();
+
+  EXPECT_EQ(store->Read(&key, &value), true);
+  EXPECT_EQ(key, "001");
+  EXPECT_EQ(value, "first tuple");
+}


Mime
View raw message