arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject arrow git commit: ARROW-1227: [GLib] Support GOutputStream
Date Mon, 17 Jul 2017 12:33:32 GMT
Repository: arrow
Updated Branches:
  refs/heads/master d53842691 -> 8644ee177


ARROW-1227: [GLib] Support GOutputStream

Author: Kouhei Sutou <kou@clear-code.com>

Closes #856 from kou/glib-support-goutput-stream and squashes the following commits:

955fecc [Kouhei Sutou] [GLib] Support GOutputStream


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/8644ee17
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/8644ee17
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/8644ee17

Branch: refs/heads/master
Commit: 8644ee1778b3512d15ced89adacde1a3cafb50c0
Parents: d538426
Author: Kouhei Sutou <kou@clear-code.com>
Authored: Mon Jul 17 14:33:28 2017 +0200
Committer: Uwe L. Korn <uwelk@xhochy.com>
Committed: Mon Jul 17 14:33:28 2017 +0200

----------------------------------------------------------------------
 c_glib/arrow-glib/error.cpp           |  16 ++++
 c_glib/arrow-glib/error.hpp           |   3 +
 c_glib/arrow-glib/input-stream.cpp    |  42 +++++----
 c_glib/arrow-glib/output-stream.cpp   | 142 +++++++++++++++++++++++++++++
 c_glib/arrow-glib/output-stream.h     |  52 ++++++++++-
 c_glib/test/test-gio-output-stream.rb |  53 +++++++++++
 6 files changed, 287 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/8644ee17/c_glib/arrow-glib/error.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/error.cpp b/c_glib/arrow-glib/error.cpp
index e5d2ad6..298f5df 100644
--- a/c_glib/arrow-glib/error.cpp
+++ b/c_glib/arrow-glib/error.cpp
@@ -23,6 +23,9 @@
 
 #include <arrow-glib/error.hpp>
 
+#include <iostream>
+#include <sstream>
+
 G_BEGIN_DECLS
 
 /**
@@ -80,3 +83,16 @@ garrow_error_check(GError **error,
     return FALSE;
   }
 }
+
+arrow::Status
+garrow_error_to_status(GError *error,
+                       arrow::StatusCode code,
+                       const char *context)
+{
+  std::stringstream message;
+  message << context << ": " << g_quark_to_string(error->domain);
+  message << "(" << error->code << "): ";
+  message << error->message;
+  g_error_free(error);
+  return arrow::Status(code, message.str());
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/8644ee17/c_glib/arrow-glib/error.hpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/error.hpp b/c_glib/arrow-glib/error.hpp
index dad27bd..4149365 100644
--- a/c_glib/arrow-glib/error.hpp
+++ b/c_glib/arrow-glib/error.hpp
@@ -26,3 +26,6 @@
 gboolean garrow_error_check(GError **error,
                             const arrow::Status &status,
                             const char *context);
+arrow::Status garrow_error_to_status(GError *error,
+                                     arrow::StatusCode code,
+                                     const char *context);

http://git-wip-us.apache.org/repos/asf/arrow/blob/8644ee17/c_glib/arrow-glib/input-stream.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp
index 40f0c94..645ce8a 100644
--- a/c_glib/arrow-glib/input-stream.cpp
+++ b/c_glib/arrow-glib/input-stream.cpp
@@ -32,9 +32,6 @@
 #include <arrow-glib/readable.hpp>
 #include <arrow-glib/tensor.hpp>
 
-#include <iostream>
-#include <sstream>
-
 G_BEGIN_DECLS
 
 /**
@@ -52,6 +49,9 @@ G_BEGIN_DECLS
  *
  * #GArrowMemoryMappedFile is a class to read data in file by mapping
  * the file on memory. It supports zero copy.
+ *
+ * #GArrowGIOInputStream is a class for `GInputStream` based input
+ * stream.
  */
 
 typedef struct GArrowInputStreamPrivate_ {
@@ -399,7 +399,9 @@ namespace garrow {
       if (g_input_stream_close(input_stream_, NULL, &error)) {
         return arrow::Status::OK();
       } else {
-        return io_error_to_status(error, "[gio-input-stream][close]");
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-input-stream][close]");
       }
     }
 
@@ -426,7 +428,9 @@ namespace garrow {
                                           NULL,
                                           &error);
       if (*n_read_bytes == -1) {
-        return io_error_to_status(error, "[gio-input-stream][read]");
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-input-stream][read]");
       } else {
         return arrow::Status::OK();
       }
@@ -445,7 +449,9 @@ namespace garrow {
                                               NULL,
                                               &error);
       if (n_read_bytes == -1) {
-        return io_error_to_status(error, "[gio-input-stream][read][buffer]");
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-input-stream][read][buffer]");
       } else {
         if (n_read_bytes < n_bytes) {
           ARROW_RETURN_NOT_OK(buffer->Resize(n_read_bytes));
@@ -472,7 +478,9 @@ namespace garrow {
                           &error)) {
         return arrow::Status::OK();
       } else {
-        return io_error_to_status(error, "[gio-input-stream][seek]");
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-input-stream][seek]");
       }
     }
 
@@ -492,7 +500,9 @@ namespace garrow {
                            G_SEEK_END,
                            NULL,
                            &error)) {
-        return io_error_to_status(error, "[gio-input-stream][size][seek]");
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-input-stream][size][seek]");
       }
       *size = g_seekable_tell(G_SEEKABLE(input_stream_));
       if (!g_seekable_seek(G_SEEKABLE(input_stream_),
@@ -500,8 +510,9 @@ namespace garrow {
                            G_SEEK_SET,
                            NULL,
                            &error)) {
-        return io_error_to_status(error,
-                                  "[gio-input-stream][size][seek][restore]");
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-input-stream][size][seek][restore]");
       }
       return arrow::Status::OK();
     }
@@ -512,15 +523,6 @@ namespace garrow {
 
   private:
     GInputStream *input_stream_;
-
-    arrow::Status io_error_to_status(GError *error, const char *context) {
-      std::stringstream message;
-      message << context << ": " << g_quark_to_string(error->domain);
-      message << "(" << error->code << "): ";
-      message << error->message;
-      g_error_free(error);
-      return arrow::Status::IOError(message.str());
-    }
   };
 };
 
@@ -544,7 +546,7 @@ garrow_gio_input_stream_class_init(GArrowGIOInputStreamClass *klass)
  * garrow_gio_input_stream_new:
  * @gio_input_stream: The stream to be read.
  *
- * Returns: A newly created #GArrowGIOInputStream.
+ * Returns: (transfer full): A newly created #GArrowGIOInputStream.
  *
  * Since: 0.5.0
  */

http://git-wip-us.apache.org/repos/asf/arrow/blob/8644ee17/c_glib/arrow-glib/output-stream.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/output-stream.cpp b/c_glib/arrow-glib/output-stream.cpp
index ffb6fec..7f37864 100644
--- a/c_glib/arrow-glib/output-stream.cpp
+++ b/c_glib/arrow-glib/output-stream.cpp
@@ -31,6 +31,9 @@
 #include <arrow-glib/tensor.hpp>
 #include <arrow-glib/writeable.hpp>
 
+#include <iostream>
+#include <sstream>
+
 G_BEGIN_DECLS
 
 /**
@@ -45,6 +48,9 @@ G_BEGIN_DECLS
  * #GArrowFileOutputStream is a class for file output stream.
  *
  * #GArrowBufferOutputStream is a class for buffer output stream.
+ *
+ * #GArrowGIOOutputStream is a class for `GOutputStream` based output
+ * stream.
  */
 
 typedef struct GArrowOutputStreamPrivate_ {
@@ -275,8 +281,144 @@ garrow_buffer_output_stream_new(GArrowResizableBuffer *buffer)
     std::make_shared<arrow::io::BufferOutputStream>(arrow_resizable_buffer);
   return garrow_buffer_output_stream_new_raw(&arrow_buffer_output_stream);
 }
+
 G_END_DECLS
 
+
+namespace garrow {
+  class GIOOutputStream : public arrow::io::OutputStream {
+  public:
+    GIOOutputStream(GOutputStream *output_stream) :
+      output_stream_(output_stream) {
+      g_object_ref(output_stream_);
+    }
+
+    ~GIOOutputStream() {
+      g_object_unref(output_stream_);
+    }
+
+    GOutputStream *get_output_stream() {
+      return output_stream_;
+    }
+
+    arrow::Status Close() override {
+      GError *error = NULL;
+      if (g_output_stream_close(output_stream_, NULL, &error)) {
+        return arrow::Status::OK();
+      } else {
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-output-stream][close]");
+      }
+    }
+
+    arrow::Status Tell(int64_t *position) override {
+      if (!G_IS_SEEKABLE(output_stream_)) {
+        std::string message("[gio-output-stream][tell] "
+                            "not seekable output stream: <");
+        message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(output_stream_));
+        message += ">";
+        return arrow::Status::NotImplemented(message);
+      }
+
+      *position = g_seekable_tell(G_SEEKABLE(output_stream_));
+      return arrow::Status::OK();
+    }
+
+    arrow::Status Write(const uint8_t *data,
+                        int64_t n_bytes) override {
+      GError *error = NULL;
+      gsize n_written_bytes;
+      auto successed = g_output_stream_write_all(output_stream_,
+                                                 data,
+                                                 n_bytes,
+                                                 &n_written_bytes,
+                                                 NULL,
+                                                 &error);
+      if (successed) {
+        return arrow::Status::OK();
+      } else {
+        std::stringstream message("[gio-output-stream][write]");
+        message << "[" << n_written_bytes << "/" << n_bytes <<
"]";
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      message.str().c_str());
+      }
+    }
+
+    arrow::Status Flush() override {
+      GError *error = NULL;
+      auto successed = g_output_stream_flush(output_stream_, NULL, &error);
+      if (successed) {
+        return arrow::Status::OK();
+      } else {
+        return garrow_error_to_status(error,
+                                      arrow::StatusCode::IOError,
+                                      "[gio-output-stream][flush]");
+      }
+    }
+
+  private:
+    GOutputStream *output_stream_;
+  };
+};
+
+G_BEGIN_DECLS
+
+G_DEFINE_TYPE(GArrowGIOOutputStream,
+              garrow_gio_output_stream,
+              GARROW_TYPE_OUTPUT_STREAM);
+
+static void
+garrow_gio_output_stream_init(GArrowGIOOutputStream *gio_output_stream)
+{
+}
+
+static void
+garrow_gio_output_stream_class_init(GArrowGIOOutputStreamClass *klass)
+{
+}
+
+/**
+ * garrow_gio_output_stream_new:
+ * @gio_output_stream: The stream to be output.
+ *
+ * Returns: (transfer full): A newly created #GArrowGIOOutputStream.
+ */
+GArrowGIOOutputStream *
+garrow_gio_output_stream_new(GOutputStream *gio_output_stream)
+{
+  auto arrow_output_stream =
+    std::make_shared<garrow::GIOOutputStream>(gio_output_stream);
+  auto object = g_object_new(GARROW_TYPE_GIO_OUTPUT_STREAM,
+                             "output-stream", &arrow_output_stream,
+                             NULL);
+  auto output_stream = GARROW_GIO_OUTPUT_STREAM(object);
+  return output_stream;
+}
+
+/**
+ * garrow_gio_output_stream_get_raw:
+ * @output_stream: A #GArrowGIOOutputStream.
+ *
+ * Returns: (transfer none): The wrapped #GOutputStream.
+ *
+ * Since: 0.5.0
+ */
+GOutputStream *
+garrow_gio_output_stream_get_raw(GArrowGIOOutputStream *output_stream)
+{
+  auto arrow_output_stream =
+    garrow_output_stream_get_raw(GARROW_OUTPUT_STREAM(output_stream));
+  auto arrow_gio_output_stream =
+    std::static_pointer_cast<garrow::GIOOutputStream>(arrow_output_stream);
+  auto gio_output_stream = arrow_gio_output_stream->get_output_stream();
+  return gio_output_stream;
+}
+
+G_END_DECLS
+
+
 GArrowOutputStream *
 garrow_output_stream_new_raw(std::shared_ptr<arrow::io::OutputStream> *arrow_output_stream)
 {

http://git-wip-us.apache.org/repos/asf/arrow/blob/8644ee17/c_glib/arrow-glib/output-stream.h
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/output-stream.h b/c_glib/arrow-glib/output-stream.h
index c86597b..e42ebcd 100644
--- a/c_glib/arrow-glib/output-stream.h
+++ b/c_glib/arrow-glib/output-stream.h
@@ -19,7 +19,7 @@
 
 #pragma once
 
-#include <glib-object.h>
+#include <gio/gio.h>
 
 #include <arrow-glib/buffer.h>
 #include <arrow-glib/tensor.h>
@@ -176,4 +176,54 @@ GType garrow_buffer_output_stream_get_type(void) G_GNUC_CONST;
 
 GArrowBufferOutputStream *garrow_buffer_output_stream_new(GArrowResizableBuffer *buffer);
 
+
+#define GARROW_TYPE_GIO_OUTPUT_STREAM           \
+  (garrow_gio_output_stream_get_type())
+#define GARROW_GIO_OUTPUT_STREAM(obj)                           \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),                            \
+                              GARROW_TYPE_GIO_OUTPUT_STREAM,    \
+                              GArrowGIOOutputStream))
+#define GARROW_GIO_OUTPUT_STREAM_CLASS(klass)                   \
+  (G_TYPE_CHECK_CLASS_CAST((klass),                             \
+                           GARROW_TYPE_GIO_OUTPUT_STREAM,       \
+                           GArrowGIOOutputStreamClass))
+#define GARROW_IS_GIO_OUTPUT_STREAM(obj)                        \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),                            \
+                              GARROW_TYPE_GIO_OUTPUT_STREAM))
+#define GARROW_IS_GIO_OUTPUT_STREAM_CLASS(klass)                \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),                             \
+                           GARROW_TYPE_GIO_OUTPUT_STREAM))
+#define GARROW_GIO_OUTPUT_STREAM_GET_CLASS(obj)                 \
+  (G_TYPE_INSTANCE_GET_CLASS((obj),                             \
+                             GARROW_TYPE_GIO_OUTPUT_STREAM,     \
+                             GArrowGIOOutputStreamClass))
+
+typedef struct _GArrowGIOOutputStream         GArrowGIOOutputStream;
+#ifndef __GTK_DOC_IGNORE__
+typedef struct _GArrowGIOOutputStreamClass    GArrowGIOOutputStreamClass;
+#endif
+
+/**
+ * GArrowGIOOutputStream:
+ *
+ * It's an output stream for `GOutputStream`.
+ */
+struct _GArrowGIOOutputStream
+{
+  /*< private >*/
+  GArrowOutputStream parent_instance;
+};
+
+#ifndef __GTK_DOC_IGNORE__
+struct _GArrowGIOOutputStreamClass
+{
+  GArrowOutputStreamClass parent_class;
+};
+#endif
+
+GType garrow_gio_output_stream_get_type(void) G_GNUC_CONST;
+
+GArrowGIOOutputStream *garrow_gio_output_stream_new(GOutputStream *gio_output_stream);
+GOutputStream *garrow_gio_output_stream_get_raw(GArrowGIOOutputStream *output_stream);
+
 G_END_DECLS

http://git-wip-us.apache.org/repos/asf/arrow/blob/8644ee17/c_glib/test/test-gio-output-stream.rb
----------------------------------------------------------------------
diff --git a/c_glib/test/test-gio-output-stream.rb b/c_glib/test/test-gio-output-stream.rb
new file mode 100644
index 0000000..adaa8c1
--- /dev/null
+++ b/c_glib/test/test-gio-output-stream.rb
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class TestGIOOutputStream < Test::Unit::TestCase
+  def test_writer_backend
+    tempfile = Tempfile.open("arrow-gio-output-stream")
+    file = Gio::File.new_for_path(tempfile.path)
+    output_stream = file.append_to(:none)
+    output = Arrow::GIOOutputStream.new(output_stream)
+    begin
+      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+      schema = Arrow::Schema.new([field])
+      file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
+      begin
+        record_batch = Arrow::RecordBatch.new(schema, 0, [])
+        file_writer.write_record_batch(record_batch)
+      ensure
+        file_writer.close
+      end
+    ensure
+      output.close
+    end
+
+    input = Arrow::MemoryMappedInputStream.new(tempfile.path)
+    begin
+      file_reader = Arrow::RecordBatchFileReader.new(input)
+      assert_equal(["enabled"],
+                   file_reader.schema.fields.collect(&:name))
+    ensure
+      input.close
+    end
+  end
+
+  def test_getter
+    output_stream = Gio::MemoryOutputStream.new
+    output = Arrow::GIOOutputStream.new(output_stream)
+    assert_equal(output_stream, output.raw)
+  end
+end


Mime
View raw message