singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [1/2] incubator-singa git commit: SINGA-202 Add reader and writer for binary file
Date Thu, 23 Jun 2016 13:52:30 GMT
Repository: incubator-singa
Updated Branches:
  refs/heads/dev 833f46195 -> 14d31a44f


SINGA-202 Add reader and writer for binary file

Add base classes of IO(Reader and Writer)
Add BinFileReader and BinFileWriter for reading and writing
  binary files in key-value tuples.
Pass a simple test for KVFile write and read.

Change CMakeLists under src and test folders.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/913f45ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/913f45ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/913f45ce

Branch: refs/heads/dev
Commit: 913f45ce29c233db1063e18b27f8d5c72d680588
Parents: 833f461
Author: XiangruiCAI <caixr91@gmail.com>
Authored: Tue Jun 21 16:55:14 2016 +0800
Committer: XiangruiCAI <caixr91@gmail.com>
Committed: Thu Jun 23 21:43:30 2016 +0800

----------------------------------------------------------------------
 include/singa/io/reader.h     |  99 +++++++++++++++++++++++++++
 include/singa/io/writer.h     | 112 ++++++++++++++++++++++++++++++
 src/io/binfile_reader.cc      | 113 ++++++++++++++++++++++++++++++
 src/io/binfile_writer.cc      | 136 +++++++++++++++++++++++++++++++++++++
 test/singa/test_binfile_rw.cc |  95 ++++++++++++++++++++++++++
 5 files changed, 555 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/include/singa/io/reader.h
----------------------------------------------------------------------
diff --git a/include/singa/io/reader.h b/include/singa/io/reader.h
new file mode 100644
index 0000000..f693da2
--- /dev/null
+++ b/include/singa/io/reader.h
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SINGA_IO_READER_H_
+#define SINGA_IO_READER_H_
+
+#include <cstring>
+#include <fstream>
+#include <string>
+
+namespace singa {
+namespace io {
+
+using std::string;
+
+/// General Reader that provides functions for reading tuples.
+/// Subclasses implement the functions for a specific data storage, e.g., CSV
+/// file, HDFS, kvfile, leveldb, lmdb, etc.
+class Reader {
+ public:
+  /// In case that users forget to call Close() to release resources, e.g.,
+  /// memory, you can release them here.
+  virtual ~Reader() {}
+
+  /// path is the path to the storage, could be a file path, database
+  /// connection, or hdfs path.
+  /// return true if open successfully, otherwise false.
+  virtual bool Open(const std::string& path, int capacity = 10485760) = 0;
+
+  /// Release resources.
+  virtual void Close() = 0;
+
+  /// Read a tuple.
+  /// return true if read successfully, otherwise false.
+  virtual bool Read(std::string* key, std::string* value) = 0;
+
+  /// Iterate through all tuples to get the num of all tuples.
+  /// return num of tuples
+  virtual int Count() = 0;
+};
+
+class BinFileReader : public Reader {
+ public:
+  ~BinFileReader() { Close(); }
+  /// \copydoc Open(const std::string& path)
+  bool Open(const std::string& path, int capacity = 10485760) override;
+  /// \copydoc Close()
+  void Close() override;
+  /// \copydoc Read(std::string* key, std::string* value)
+  bool Read(std::string* key, std::string* value) override;
+  /// \copydoc Count()
+  int Count() override;
+  /// return path to binary file
+  inline std::string path() { return path_; }
+
+ protected:
+  /// Read the next filed, including content_len and content;
+  /// return true if succeed.
+  bool ReadField(std::string* content);
+
+  /// Read data from disk if the current data in the buffer is not a full field.
+  /// size is the size of the next field.
+  bool PrepareNextField(int size);
+
+ private:
+  std::string path_ = "";
+  /// file to be read
+  std::ifstream fdat_;
+  /// internal buffer
+  char* buf_ = nullptr;
+  /// offset inside the buf_
+  int offset_ = 0;
+  /// allocated bytes for the buf_
+  int capacity_ = 0;
+  /// bytes in buf_
+  int bufsize_ = 0;
+  /// magic word
+  const char kMagicWord[2] = {'s', 'g'};
+};
+
+}  // namespace io
+}  // namespace singa
+
+#endif  // SINGA_IO_READER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/include/singa/io/writer.h
----------------------------------------------------------------------
diff --git a/include/singa/io/writer.h b/include/singa/io/writer.h
new file mode 100644
index 0000000..a847ead
--- /dev/null
+++ b/include/singa/io/writer.h
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SINGA_IO_WRITER_H_
+#define SINGA_IO_WRITER_H_
+
+#include <string>
+#include <cstring>
+#include <fstream>
+
+namespace singa {
+namespace io {
+
+using std::string;
+enum Mode { kCreate, kAppend };
+
+/// General Writer that provides functions for writing tuples.
+/// Subclasses implement the functions for a specific data storage, e.g., CSV
+/// file, HDFS, image folder, leveldb, lmdb, etc.
+class Writer {
+ public:
+  /// In case that users forget to call Close() to release resources, e.g.,
+  /// memory, you can release them here.
+  virtual ~Writer() {}
+
+  /// Open a file.
+  /// path is the path to the disk BinFile, it can be
+  ///  - a path to local disk file.
+  ///  - a path to local directory. This is to be compatible with the older
+  ///    version (DataShard). The KVFile is shard.dat under that directory
+  ///  - a hdfs file starting with "hdfs://"
+  /// mode is KVFile open mode(kCreate, kAppend).
+  /// buffer Caches capacity bytes data for every disk op (read or write),
+  /// default is 10MB.
+  virtual bool Open(const std::string &path, Mode mode,
+                    int capacity = 10485760) = 0;
+
+  /// Release resources.
+  virtual void Close() = 0;
+
+  /// Write a key-value tuple.
+  /// return true if success, otherwise false.
+  virtual bool Write(const std::string &key, const std::string &value) = 0;
+
+  /// Flush writing buffer if it has.
+  virtual void Flush() = 0;
+};
+
+/// BinFile stores training/validation/test tuples.
+/// Each tuple is encoded as [magic_word, key_len, key, val_len, val]:
+///  - magic_word has 4 bytes; the first two are "s" and "g", the third one
+/// indicates whether key is null, the last one is reserved for future use.
+///  - key_len and val_len are of type uint32, which indicate the bytes of key
+/// and value respectively;
+///  - key_len and key are optional.)
+/// When BinFile is created, it will remove the last tuple if the value size
+/// and key size do not match because the last write crashed.
+class BinFileWriter : public Writer {
+ public:
+  ~BinFileWriter() { Close(); }
+  /// \copydoc Open(const std::string &path, Mode mode, int bufsize = 10485760)
+  bool Open(const std::string &path, Mode mode,
+            int capacity = 10485760) override;
+  /// \copydoc Close()
+  void Close() override;
+  /// \copydoc Write(const std::string& key, const std::string& value) override;
+  bool Write(const std::string &key, const std::string &value) override;
+  /// \copydoc Flush()
+  void Flush() override;
+  /// return path to binary file
+  inline std::string path() { return path_; }
+
+ protected:
+  /// Setup the disk pointer to the right position for append in case that
+  /// the pervious write crashes.
+  /// return offset (end pos) of the last success written record.
+  int PrepareForAppend(const std::string &path);
+
+ private:
+  std::string path_ = "";
+  Mode mode_;
+  /// file to be written
+  std::ofstream fdat_;
+  /// internal buffer
+  char *buf_ = nullptr;
+  /// allocated bytes for the buf_
+  int capacity_ = 0;
+  /// bytes in buf_
+  int bufsize_ = 0;
+  /// magic word
+  const char kMagicWord[2]= {'s', 'g'};
+};
+
+}  // namespace io
+}  // namespace singa
+
+#endif  // SINGA_IO_WRITER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/src/io/binfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/io/binfile_reader.cc b/src/io/binfile_reader.cc
new file mode 100644
index 0000000..6a29540
--- /dev/null
+++ b/src/io/binfile_reader.cc
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "singa/io/reader.h"
+#include "singa/utils/logging.h"
+
+namespace singa {
+namespace io {
+bool BinFileReader::Open(const std::string& path, int capacity) {
+  path_ = path;
+  capacity_ = capacity;
+  buf_ = new char[capacity_];
+  fdat_.open(path_, std::ios::in | std::ios::binary);
+  CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+  return fdat_.is_open();
+}
+
+void BinFileReader::Close() {
+  if (buf_ != nullptr) {
+    delete[] buf_;
+    buf_ = nullptr;
+  }
+  if (fdat_.is_open()) fdat_.close();
+}
+
+bool BinFileReader::Read(std::string* key, std::string* value) {
+  CHECK(fdat_.is_open()) << "File not open!";
+  char magic[4];
+  int smagic = sizeof(magic);
+  if (!PrepareNextField(smagic)) return false;
+  memcpy(magic, buf_ + offset_, smagic);
+  offset_ += smagic;
+
+  if (magic[0] == kMagicWord[0] && magic[1] == kMagicWord[1]) {
+    if (magic[2] != 0 && magic[2] != 1) return false;
+    if (magic[2] == 1)
+      if (!ReadField(key)) return false;
+    if (!ReadField(value)) return false;
+  }
+  return true;
+}
+
+int BinFileReader::Count() {
+  std::ifstream fin(path_, std::ios::in | std::ios::binary);
+  CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+  int count = 0;
+  while (true) {
+    size_t len;
+    char magic[4];
+    fin.read(reinterpret_cast<char*>(magic), sizeof(magic));
+    if (!fin.good()) break;
+    if (magic[2] == 1) {
+      fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+      if (!fin.good()) break;
+      fin.seekg(len, std::ios_base::cur);
+      if (!fin.good()) break;
+    }
+    fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+    if (!fin.good()) break;
+    fin.seekg(len, std::ios_base::cur);
+    if (!fin.good()) break;
+    count++;
+  }
+  fin.close();
+  return count;
+}
+
+bool BinFileReader::ReadField(std::string* content) {
+  content->clear();
+  int ssize = sizeof(size_t);
+  if (!PrepareNextField(ssize)) return false;
+  int len = *reinterpret_cast<size_t*>(buf_ + offset_);
+  offset_ += ssize;
+  if (!PrepareNextField(len)) return false;
+  for (int i = 0; i < len; ++i) content->push_back(buf_[offset_ + i]);
+  offset_ += len;
+  return true;
+}
+
+// if the buf does not have the next complete field, read data from disk
+bool BinFileReader::PrepareNextField(int size) {
+  if (offset_ + size > bufsize_) {
+    bufsize_ -= offset_;
+    memcpy(buf_, buf_ + offset_, bufsize_);
+    offset_ = 0;
+    if (fdat_.eof()) {
+      return false;
+    } else {
+      fdat_.read(buf_ + bufsize_, capacity_ - bufsize_);
+      bufsize_ += fdat_.gcount();
+      if (size > bufsize_) return false;
+    }
+  }
+  return true;
+}
+
+}  // namespace io
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/src/io/binfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/io/binfile_writer.cc b/src/io/binfile_writer.cc
new file mode 100644
index 0000000..b1d7951
--- /dev/null
+++ b/src/io/binfile_writer.cc
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "singa/io/writer.h"
+#include "singa/utils/logging.h"
+
+namespace singa {
+namespace io {
+bool BinFileWriter::Open(const std::string& path, Mode mode, int capacity) {
+  CHECK(!fdat_.is_open());
+  path_ = path;
+  mode_ = mode;
+  capacity_ = capacity;
+  buf_ = new char[capacity_];
+  switch (mode) {
+    case kCreate:
+      fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::trunc);
+      CHECK(fdat_.is_open()) << "Cannot create file " << path_;
+      break;
+    case kAppend:
+      fdat_.open(path_, std::ios::in | std::ios::binary);
+      CHECK(fdat_.is_open()) << "Cannot open file " << path_;
+      fdat_.close();
+      {
+        int last_tuple = PrepareForAppend(path_);
+        fdat_.open(path_, std::ios::binary | std::ios::out | std::ios::in |
+                              std::ios::ate);
+        fdat_.seekp(last_tuple);
+      }
+      break;
+    default:
+      LOG(FATAL) << "unknown model to open KVFile " << mode;
+      break;
+  }
+  return fdat_.is_open();
+}
+
+void BinFileWriter::Close() {
+  Flush();
+  if (buf_ != nullptr) {
+    delete buf_;
+    buf_ = nullptr;
+  }
+  if (fdat_.is_open()) fdat_.close();
+}
+
+bool BinFileWriter::Write(const std::string& key, const std::string& value) {
+  CHECK(fdat_.is_open()) << "File not open!";
+  if (value.size() == 0) return false;
+  // magic_word + (key_len + key) + val_len + val
+  char magic[4];
+  int size;
+  memcpy(magic, kMagicWord, sizeof(kMagicWord));
+  magic[3] = 0;
+  if (key.size() == 0) {
+    magic[2] = 0;
+    size = sizeof(magic) + sizeof(size_t) + value.size();
+  } else {
+    magic[2] = 1;
+    size = sizeof(magic) + 2 * sizeof(size_t) + key.size() + value.size();
+  }
+
+  if (bufsize_ + size > capacity_) {
+    fdat_.write(buf_, bufsize_);
+    bufsize_ = 0;
+    CHECK_LE(size, capacity_) << "Tuple size is larger than capacity "
+                              << "Try a larger capacity size";
+  }
+
+  memcpy(buf_ + bufsize_, magic, sizeof(magic));
+  bufsize_ += sizeof(magic);
+  if (key.size() > 0) {
+    *reinterpret_cast<size_t*>(buf_ + bufsize_) = key.size();
+    bufsize_ += sizeof(size_t);
+    std::memcpy(buf_ + bufsize_, key.data(), key.size());
+    bufsize_ += key.size();
+  }
+  *reinterpret_cast<size_t*>(buf_ + bufsize_) = value.size();
+  bufsize_ += sizeof(size_t);
+  std::memcpy(buf_ + bufsize_, value.data(), value.size());
+  bufsize_ += value.size();
+  return true;
+}
+
+void BinFileWriter::Flush() {
+  CHECK(fdat_);
+  if (bufsize_ > 0) {
+    fdat_.write(buf_, bufsize_);
+    fdat_.flush();
+    bufsize_ = 0;
+  }
+}
+
+int BinFileWriter::PrepareForAppend(const std::string& path) {
+  std::ifstream fin(path, std::ios::in | std::ios::binary);
+  if (!fin.is_open()) return 0;
+  int last_tuple_offset = 0;
+  char buf[256];
+  size_t len;
+  char magic[4];
+  while (true) {
+    fin.read(magic, sizeof(magic));
+    if (!fin.good()) break;
+    if (magic[2] == 1) {
+      fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+      if (!fin.good()) break;
+      fin.read(buf, len);
+      buf[len] = '\0';
+      if (!fin.good()) break;
+    }
+    fin.read(reinterpret_cast<char*>(&len), sizeof(len));
+    if (!fin.good()) break;
+    fin.seekg(len, std::ios_base::cur);
+    if (!fin.good()) break;
+    last_tuple_offset = fin.tellg();
+  }
+  fin.close();
+  return last_tuple_offset;
+}
+}  // namespace io
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/913f45ce/test/singa/test_binfile_rw.cc
----------------------------------------------------------------------
diff --git a/test/singa/test_binfile_rw.cc b/test/singa/test_binfile_rw.cc
new file mode 100644
index 0000000..45afd56
--- /dev/null
+++ b/test/singa/test_binfile_rw.cc
@@ -0,0 +1,95 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include "../include/singa/io/reader.h"
+#include "../include/singa/io/writer.h"
+#include "gtest/gtest.h"
+
+const char* path = "./binfile_test";
+using singa::io::BinFileReader;
+using singa::io::BinFileWriter;
+TEST(BinFileWriter, Create) {
+  BinFileWriter writer;
+  bool ret;
+  ret = writer.Open(path, singa::io::kCreate);
+  EXPECT_EQ(true, ret);
+
+  std::string key = "";
+  std::string value = "\nThis is a test for binfile io.";
+  ret = writer.Write(key, value);
+  EXPECT_EQ(true, ret);
+
+  ret = writer.Write(key, value);
+  EXPECT_EQ(true, ret);
+
+  writer.Flush();
+  writer.Close();
+}
+
+TEST(BinFileWriter, Append) {
+  BinFileWriter writer;
+  bool ret;
+  ret = writer.Open(path, singa::io::kAppend);
+  EXPECT_EQ(true, ret);
+
+  std::string key = "1";
+  std::string value = "\nThis is another test for binfile io.";
+  ret = writer.Write(key, value);
+  EXPECT_EQ(true, ret);
+
+  key = "2";
+  value = "\nThis is another test for binfile io.";
+  ret = writer.Write(key, value);
+  EXPECT_EQ(true, ret);
+
+  writer.Flush();
+  writer.Close();
+}
+
+TEST(BinFileReader, Read) {
+  BinFileReader reader;
+  bool ret;
+  ret = reader.Open(path);
+  EXPECT_EQ(true, ret);
+
+  int cnt = reader.Count();
+  EXPECT_EQ(4, cnt);
+
+  std::string key, value;
+  reader.Read(&key, &value);
+  EXPECT_STREQ("", key.c_str());
+  EXPECT_STREQ("\nThis is a test for binfile io.", value.c_str());
+
+  reader.Read(&key, &value);
+  EXPECT_STREQ("", key.c_str());
+  EXPECT_STREQ("\nThis is a test for binfile io.", value.c_str());
+
+  reader.Read(&key, &value);
+  EXPECT_STREQ("1", key.c_str());
+  EXPECT_STREQ("\nThis is another test for binfile io.", value.c_str());
+
+  reader.Read(&key, &value);
+  EXPECT_STREQ("2", key.c_str());
+  EXPECT_STREQ("\nThis is another test for binfile io.", value.c_str());
+
+  reader.Close();
+  remove(path);
+}


Mime
View raw message