subversion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stef...@apache.org
Subject svn commit: r1700799 - in /subversion/trunk/subversion: include/svn_io.h libsvn_subr/stream.c svnadmin/svnadmin.c svnfsfs/load-index-cmd.c tests/libsvn_subr/stream-test.c
Date Wed, 02 Sep 2015 13:04:52 GMT
Author: stefan2
Date: Wed Sep  2 13:04:51 2015
New Revision: 1700799

URL: http://svn.apache.org/r1700799
Log:
[Combines r1698359 and r170078 into a single commit for better review.]
Introduce a stream wrapper object that adds buffering support to any
readable stream.  Use it on the stdin streams in our CL tools. 

As it turns out, parsing data from a stdin byte-by-byte incurs a
massive overhead of 100% internal and 300% system load over a buffered
stream.  'svnadmin load-revprops' sees a 5 times speedup if all data
is in OS disc caches.  This is a realistic assumption in a "final sync
and switch over to new repository" scenario.

The other 2 uses of stdin either have less data to process (svnfsfs
load-index) or parse only a small fraction of the stream (svnadmin load).

* subversion/include/svn_io.h
  (svn_stream_wrap_buffered_read): Declare the new stream constructor API.

* subversion/libsvn_subr/stream.c
  (read_handler_buffering_wrapper,
   data_available_handler_buffering_wrapper,
   is_buffered_handler_buffering_wrapper): Internal logic of the new
                                           stream object.
  (svn_stream_wrap_buffered_read): New constructor implementation.

* subversion/svnadmin/svnadmin.c
  (subcommand_load_revprops): Wrap the stdin stream.

* subversion/svnfsfs/load-index-cmd.c
  (subcommand__load_index): Same.

* subversion/tests/libsvn_subr/stream-test.c
  (struct stream_baton_t,
   read_handler,
   data_available_handler,
   create_test_read_stream): New configurable test read stream.
  (expect_line_content,
   test_stream_buffered_wrapper): New test for the new wrapper stream. 
  (test_funcs): Register the new test.

Modified:
    subversion/trunk/subversion/include/svn_io.h
    subversion/trunk/subversion/libsvn_subr/stream.c
    subversion/trunk/subversion/svnadmin/svnadmin.c
    subversion/trunk/subversion/svnfsfs/load-index-cmd.c
    subversion/trunk/subversion/tests/libsvn_subr/stream-test.c

Modified: subversion/trunk/subversion/include/svn_io.h
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/include/svn_io.h?rev=1700799&r1=1700798&r2=1700799&view=diff
==============================================================================
--- subversion/trunk/subversion/include/svn_io.h (original)
+++ subversion/trunk/subversion/include/svn_io.h Wed Sep  2 13:04:51 2015
@@ -1157,6 +1157,18 @@ svn_stream_t *
 svn_stream_from_string(const svn_string_t *str,
                        apr_pool_t *pool);
 
+/** Return a generic read-only stream that forwards data from @a inner but
+ * uses buffering for efficiency.  Allocate the stream in @a result_pool.
+ *
+ * @note The returned stream has no support for writing nor mark and seek
+ * even if @a inner does support those functions.
+ *
+ * @since New in 1.10.
+ */
+svn_stream_t *
+svn_stream_wrap_buffered_read(svn_stream_t *inner,
+                              apr_pool_t *result_pool);
+
 /** Return a generic stream which implements buffered reads and writes.
  *  The stream will preferentially store data in-memory, but may use
  *  disk storage as backup if the amount of data is large.

Modified: subversion/trunk/subversion/libsvn_subr/stream.c
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/libsvn_subr/stream.c?rev=1700799&r1=1700798&r2=1700799&view=diff
==============================================================================
--- subversion/trunk/subversion/libsvn_subr/stream.c (original)
+++ subversion/trunk/subversion/libsvn_subr/stream.c Wed Sep  2 13:04:51 2015
@@ -1710,6 +1710,114 @@ svn_stream_from_string(const svn_string_
   return stream;
 }
 
+/* Baton structure for buffering read stream wrappers.
+ *
+ * We read from INNER and append the data to BUFFER.  From BUFFER, we serve
+ * read requests. Old buffer contents gets discarded once it is no longer
+ * needed.
+ */
+struct buffering_stream_wrapper_baton
+{
+  /* Our data source. */
+  svn_stream_t *inner;
+
+  /* Contains the data pre-read from INNER. Some of this may already have
+   * been delivered. */
+  svn_stringbuf_t *buffer;
+
+  /* Current read position relative to the start of BUFFER->DATA. */
+  apr_size_t buffer_pos;
+};
+
+/* Implements svn_stream_t.read_fn for buffering read stream wrappers. */
+static svn_error_t *
+read_handler_buffering_wrapper(void *baton,
+                               char *buffer,
+                               apr_size_t *len)
+{
+  struct buffering_stream_wrapper_baton *btn = baton;
+  apr_size_t left_to_read = btn->buffer->len - btn->buffer_pos;
+
+  /* This is the "normal" and potential incomplete read function.
+   * So, we only need to replenish our buffers if we ran completely dry. */
+  if (left_to_read == 0)
+    {
+      apr_size_t count = btn->buffer->blocksize;
+
+      /* Read from the INNER stream. */
+      SVN_ERR(svn_stream_read2(btn->inner, btn->buffer->data, &count));
+      btn->buffer->len = count;
+      btn->buffer_pos = 0;
+
+      /* We may now have more data that we could return. */
+      left_to_read = btn->buffer->len;
+    }
+
+  /* Cap the read request to what we can deliver from the buffer. */
+  if (left_to_read < *len)
+    *len = left_to_read;
+
+  /* Copy the data from the buffer and move the read pointer accordingly. */
+  memcpy(buffer, btn->buffer->data + btn->buffer_pos, *len);
+  btn->buffer_pos += *len;
+
+  return SVN_NO_ERROR;
+}
+
+/* Implements svn_stream_t.data_available_fn for buffering read stream
+ * wrappers. */
+static svn_error_t *
+data_available_handler_buffering_wrapper(void *baton,
+                                         svn_boolean_t *data_available)
+{
+  /* If we still have some unread data, this becomes easy to answer. */
+  struct buffering_stream_wrapper_baton *btn = baton;
+  if (btn->buffer->len > btn->buffer_pos)
+    {
+      *data_available = TRUE;
+      return SVN_NO_ERROR;
+    }
+
+  /* Otherwise, because we would always read from the inner streams' current
+   * position to fill the buffer, asking the inner stream when the buffer is
+   * exhausted gives the correct answer. */
+  return svn_error_trace(svn_stream_data_available(btn->inner,
+                                                   data_available));
+}
+
+/* Implements svn_stream_t.is_buffered_fn for buffering read stream wrappers.
+ */
+static svn_boolean_t
+is_buffered_handler_buffering_wrapper(void *baton)
+{
+  return TRUE;
+}
+
+svn_stream_t *
+svn_stream_wrap_buffered_read(svn_stream_t *inner,
+                              apr_pool_t *result_pool)
+{
+  svn_stream_t *stream;
+  struct buffering_stream_wrapper_baton *baton;
+
+  /* Create the wrapper stream state.
+   * The buffer is empty and we are at position 0. */
+  baton = apr_pcalloc(result_pool, sizeof(*baton));
+  baton->inner = inner;
+  baton->buffer = svn_stringbuf_create_ensure(SVN__STREAM_CHUNK_SIZE,
+                                              result_pool);
+  baton->buffer_pos = 0;
+
+  /* Create the wrapper stream object and set up the vtable. */
+  stream = svn_stream_create(baton, result_pool);
+  svn_stream_set_read2(stream, read_handler_buffering_wrapper, NULL);
+  svn_stream_set_data_available(stream,
+                                data_available_handler_buffering_wrapper);
+  svn_stream__set_is_buffered(stream, is_buffered_handler_buffering_wrapper);
+
+  return stream;
+}
+
 
 svn_error_t *
 svn_stream_for_stdin(svn_stream_t **in, apr_pool_t *pool)

Modified: subversion/trunk/subversion/svnadmin/svnadmin.c
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/svnadmin/svnadmin.c?rev=1700799&r1=1700798&r2=1700799&view=diff
==============================================================================
--- subversion/trunk/subversion/svnadmin/svnadmin.c (original)
+++ subversion/trunk/subversion/svnadmin/svnadmin.c Wed Sep  2 13:04:51 2015
@@ -1548,6 +1548,7 @@ subcommand_load_revprops(apr_getopt_t *o
 
   /* Read the stream from STDIN.  Users can redirect a file. */
   SVN_ERR(svn_stream_for_stdin(&stdin_stream, pool));
+  stdin_stream = svn_stream_wrap_buffered_read(stdin_stream, pool);
 
   /* Progress feedback goes to STDOUT, unless they asked to suppress it. */
   if (! opt_state->quiet)

Modified: subversion/trunk/subversion/svnfsfs/load-index-cmd.c
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/svnfsfs/load-index-cmd.c?rev=1700799&r1=1700798&r2=1700799&view=diff
==============================================================================
--- subversion/trunk/subversion/svnfsfs/load-index-cmd.c (original)
+++ subversion/trunk/subversion/svnfsfs/load-index-cmd.c Wed Sep  2 13:04:51 2015
@@ -187,6 +187,7 @@ subcommand__load_index(apr_getopt_t *os,
   svn_stream_t *input;
 
   SVN_ERR(svn_stream_for_stdin(&input, pool));
+  input = svn_stream_wrap_buffered_read(input, pool);
   SVN_ERR(load_index(opt_state->repository_path, input, pool));
 
   return SVN_NO_ERROR;

Modified: subversion/trunk/subversion/tests/libsvn_subr/stream-test.c
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/tests/libsvn_subr/stream-test.c?rev=1700799&r1=1700798&r2=1700799&view=diff
==============================================================================
--- subversion/trunk/subversion/tests/libsvn_subr/stream-test.c (original)
+++ subversion/trunk/subversion/tests/libsvn_subr/stream-test.c Wed Sep  2 13:04:51 2015
@@ -32,6 +32,75 @@
 
 #include "../svn_test.h"
 
+struct stream_baton_t
+{
+  svn_filesize_t capacity_left;
+  char current;
+  apr_size_t max_read;
+};
+
+/* Implements svn_stream_t.read_fn. */
+static svn_error_t *
+read_handler(void *baton,
+             char *buffer,
+             apr_size_t *len)
+{
+  struct stream_baton_t *btn = baton;
+  int i;
+
+  /* Cap the read request to what we actually support. */
+  if (btn->max_read < *len)
+    *len = btn->max_read;
+  if (btn->capacity_left < *len)
+    *len = (apr_size_t)btn->capacity_left;
+
+  /* Produce output */
+  for (i = 0; i < *len; ++i)
+    {
+      buffer[i] = btn->current + 1;
+      btn->current = (btn->current + 1) & 0x3f;
+
+      btn->capacity_left--;
+    }
+
+  return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+data_available_handler(void *baton,
+                       svn_boolean_t *data_available)
+{
+  struct stream_baton_t *btn = baton;
+  *data_available = btn->capacity_left > 0;
+
+  return SVN_NO_ERROR;
+}
+
+/* Return a stream that produces CAPACITY characters in chunks of at most
+ * MAX_READ chars.  The first char will be '\1' followed by '\2' etc. up
+ * to '\x40' and then repeating the cycle until the end of the stream.
+ * Allocate the result in RESULT_POOL. */
+static svn_stream_t *
+create_test_read_stream(svn_filesize_t capacity,
+                        apr_size_t max_read,
+                        apr_pool_t *result_pool)
+{
+  svn_stream_t *stream;
+  struct stream_baton_t *baton;
+
+  baton = apr_pcalloc(result_pool, sizeof(*baton));
+  baton->capacity_left = capacity;
+  baton->current = 0;
+  baton->max_read = max_read;
+
+  stream = svn_stream_create(baton, result_pool);
+  svn_stream_set_read2(stream, read_handler, NULL);
+  svn_stream_set_data_available(stream, data_available_handler);
+
+  return stream;
+}
+
+/*------------------------ Tests --------------------------- */
 
 static svn_error_t *
 test_stream_from_string(apr_pool_t *pool)
@@ -803,6 +872,70 @@ test_stream_compressed_read_full(apr_poo
   return SVN_NO_ERROR;
 }
 
+/* Utility function verifying that LINE contains LENGTH characters read
+ * from a stream returned by create_test_read_stream().  C is the first
+ * character expected in LINE. */
+static svn_error_t *
+expect_line_content(svn_stringbuf_t *line,
+                    char start,
+                    apr_size_t length)
+{
+  apr_size_t i;
+  char c = start - 1;
+
+  SVN_TEST_ASSERT(line->len == length);
+  for (i = 0; i < length; ++i)
+    {
+      SVN_TEST_ASSERT(line->data[i] == c + 1);
+      c = (c + 1) & 0x3f;
+    }
+
+  return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+test_stream_buffered_wrapper(apr_pool_t *pool)
+{
+  apr_pool_t *iterpool = svn_pool_create(pool);
+  svn_stringbuf_t *line;
+  svn_boolean_t eof = FALSE;
+  apr_size_t read = 0;
+
+  /* At least a few stream chunks (16k) worth of data. */
+  const apr_size_t stream_length = 100000;
+
+  /* Our source stream delivers data in very small chunks only.
+   * This requires multiple reads per line while readline will hold marks
+   * etc. */
+  svn_stream_t *stream = create_test_read_stream(stream_length, 19, pool);
+  stream = svn_stream_wrap_buffered_read(stream, pool);
+
+  /* We told the stream not to supports seeking to the start. */
+  SVN_TEST_ASSERT_ERROR(svn_stream_seek(stream, NULL),
+                        SVN_ERR_STREAM_SEEK_NOT_SUPPORTED);
+
+  /* Read all lines. Check EOF detection. */
+  while (!eof)
+    {
+      /* The local pool ensures that marks get cleaned up. */
+      svn_pool_clear(iterpool);
+      SVN_ERR(svn_stream_readline(stream, &line, "\n", &eof, iterpool));
+
+      /* Verify that we read the correct data and the full stream. */
+      if (read == 0)
+        SVN_ERR(expect_line_content(line, 1, '\n' - 1));
+      else if (eof)
+        SVN_ERR(expect_line_content(line, '\n' + 1, stream_length - read));
+      else
+        SVN_ERR(expect_line_content(line, '\n' + 1, 63));
+
+      /* Update bytes read. */
+      read += line->len + 1;
+    }
+
+  return SVN_NO_ERROR;
+}
+
 /* The test table.  */
 
 static int max_threads = 1;
@@ -834,6 +967,8 @@ static struct svn_test_descriptor_t test
                    "test svn_stringbuf_from_stream"),
     SVN_TEST_PASS2(test_stream_compressed_read_full,
                    "test compression for streams without partial read"),
+    SVN_TEST_PASS2(test_stream_buffered_wrapper,
+                   "test buffering read stream wrapper"),
     SVN_TEST_NULL
   };
 



Mime
View raw message