subversion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stef...@apache.org
Subject svn commit: r1438453 - /subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c
Date Fri, 25 Jan 2013 11:15:05 GMT
Author: stefan2
Date: Fri Jan 25 11:15:05 2013
New Revision: 1438453

URL: http://svn.apache.org/viewvc?rev=1438453&view=rev
Log:
On the fsfs-format7 branch:  Implement layout optimizing fsfs packing
for format7 repos.  Code works but still needs documentation and cleanup.

* subversion/libsvn_fs_fs/pack.c
  (rep_info_t,
   pack_context_t): new data types
  (initialize_pack_context,
   reset_pack_context,
   close_pack_context,
   copy_file_data,
   copy_item_to_temp,
   get_item_array_index,
   add_item_rep_mapping,
   copy_rep_to_temp,
   compare_dir_entries,
   copy_node_to_temp,
   compare_p2l_info,
   sort_items,
   compare_p2l_info_rev,
   sort_by_rev,
   pick_recursively,
   sort_reps,
   copy_items_from_temp,
   append_entries,
   write_l2p_index,
   pack_section,
   append_revision): various new utility functions
  (pack_log_addressed): new main pack function for format7 repos
  (pack_phys_addressed): new main pack function for format6 repos
   factored out from pack_rev_shared
  (copy_indexes): drop obsolete function
  (pack_rev_shard): simplify; call the suitable pack sub-function
  (pack_shard): provide extra parameter

Modified:
    subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c

Modified: subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c
URL: http://svn.apache.org/viewvc/subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c?rev=1438453&r1=1438452&r2=1438453&view=diff
==============================================================================
--- subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c (original)
+++ subversion/branches/fsfs-format7/subversion/libsvn_fs_fs/pack.c Fri Jan 25 11:15:05 2013
@@ -19,8 +19,11 @@
  *    under the License.
  * ====================================================================
  */
+#include <assert.h>
+
 #include "svn_pools.h"
 #include "svn_dirent_uri.h"
+#include "svn_sorts.h"
 #include "private/svn_temp_serializer.h"
 
 #include "fs_fs.h"
@@ -29,12 +32,856 @@
 #include "revprops.h"
 #include "transaction.h"
 #include "index.h"
+#include "low_level.h"
+#include "cached_data.h"
 
 #include "../libsvn_fs/fs-loader.h"
 
 #include "svn_private_config.h"
 #include "temp_serializer.h"
 
+typedef struct rep_info_t
+{
+  struct svn_fs_fs__p2l_entry_t *entry;
+  struct rep_info_t *base;
+  struct rep_info_t *next;
+} rep_info_t;
+
+typedef struct pack_context_t
+{
+  svn_fs_t *fs;
+  svn_cancel_func_t cancel_func;
+  void *cancel_baton;
+
+  svn_revnum_t shard_rev;
+  svn_revnum_t start_rev;
+  svn_revnum_t end_rev;
+  svn_revnum_t shard_end_rev;
+  
+  apr_file_t *proto_l2p_index;
+  apr_file_t *proto_p2l_index;
+
+  const char *shard_dir;
+  const char *pack_file_dir;
+  const char *pack_file_path;
+  apr_off_t pack_offset;
+  apr_file_t *pack_file;
+
+  apr_array_header_t *changes;
+  apr_file_t *changes_file;
+  apr_array_header_t *file_props;
+  apr_file_t *file_props_file;
+  apr_array_header_t *dir_props;
+  apr_file_t *dir_props_file;
+  
+  apr_array_header_t *rev_offsets;
+  apr_array_header_t *reps_infos;
+  apr_array_header_t *reps;
+  apr_file_t *reps_file;
+
+  apr_pool_t *info_pool;
+} pack_context_t;
+
+static svn_error_t *
+initialize_pack_context(pack_context_t *context,
+                        svn_fs_t *fs,
+                        const char *pack_file_dir,
+                        const char *shard_dir,
+                        svn_revnum_t shard_rev,
+                        apr_size_t max_items,
+                        svn_cancel_func_t cancel_func,
+                        void *cancel_baton,
+                        apr_pool_t *pool)
+{
+  fs_fs_data_t *ffd = fs->fsap_data;
+  const char *temp_dir;
+  apr_size_t max_revs = MIN(ffd->max_files_per_dir, (int)max_items);
+  
+  SVN_ERR_ASSERT(ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT);
+  SVN_ERR_ASSERT(shard_rev % ffd->max_files_per_dir == 0);
+  
+  SVN_ERR(svn_io_temp_dir(&temp_dir, pool));
+
+  context->fs = fs;
+  context->cancel_func = cancel_func;
+  context->cancel_baton = cancel_baton;
+
+  context->shard_rev = shard_rev;
+  context->start_rev = shard_rev;
+  context->end_rev = shard_rev;
+  context->shard_end_rev = shard_rev + ffd->max_files_per_dir;
+  
+  /* Create the new directory and pack file. */
+  context->shard_dir = shard_dir;
+  context->pack_file_dir = pack_file_dir;
+  context->pack_file_path
+    = svn_dirent_join(pack_file_dir, PATH_PACKED, pool);
+  SVN_ERR(svn_io_file_open(&context->pack_file, context->pack_file_path,
+                           APR_WRITE | APR_BUFFERED | APR_BINARY | APR_EXCL
+                             | APR_CREATE, APR_OS_DEFAULT, pool));
+
+  /* Index information files */
+  SVN_ERR(svn_fs_fs__l2p_proto_index_open
+            (&context->proto_l2p_index,
+             svn_dirent_join(pack_file_dir,
+                             PATH_INDEX PATH_EXT_L2P_INDEX,
+                             pool),
+             pool));
+  SVN_ERR(svn_fs_fs__p2l_proto_index_open
+            (&context->proto_p2l_index,
+             svn_dirent_join(pack_file_dir,
+                             PATH_INDEX PATH_EXT_P2L_INDEX,
+                             pool),
+             pool));
+
+  context->changes = apr_array_make(pool, max_items,
+                                    sizeof(svn_fs_fs__p2l_entry_t *));
+  SVN_ERR(svn_io_open_unique_file3(&context->changes_file, NULL, temp_dir,
+                                   svn_io_file_del_on_close, pool, pool));
+  context->file_props = apr_array_make(pool, max_items,
+                                       sizeof(svn_fs_fs__p2l_entry_t *));
+  SVN_ERR(svn_io_open_unique_file3(&context->file_props_file, NULL, temp_dir,
+                                   svn_io_file_del_on_close, pool, pool));
+  context->dir_props = apr_array_make(pool, max_items,
+                                      sizeof(svn_fs_fs__p2l_entry_t *));
+  SVN_ERR(svn_io_open_unique_file3(&context->dir_props_file, NULL, temp_dir,
+                                   svn_io_file_del_on_close, pool, pool));
+
+  context->rev_offsets = apr_array_make(pool, max_revs, sizeof(int));
+  context->reps_infos = apr_array_make(pool, max_items, sizeof(rep_info_t *));
+  context->reps = apr_array_make(pool, max_items,
+                                 sizeof(svn_fs_fs__p2l_entry_t *));
+  SVN_ERR(svn_io_open_unique_file3(&context->reps_file, NULL, temp_dir,
+                                   svn_io_file_del_on_close, pool, pool));
+
+  context->info_pool = svn_pool_create(pool);
+
+  return SVN_NO_ERROR;
+};
+
+static svn_error_t *
+reset_pack_context(pack_context_t *context,
+                   apr_pool_t *pool)
+{
+  apr_array_clear(context->changes);
+  SVN_ERR(svn_io_file_trunc(context->changes_file, 0, pool));
+  apr_array_clear(context->file_props);
+  SVN_ERR(svn_io_file_trunc(context->file_props_file, 0, pool));
+  apr_array_clear(context->dir_props);
+  SVN_ERR(svn_io_file_trunc(context->dir_props_file, 0, pool));
+
+  apr_array_clear(context->rev_offsets);
+  apr_array_clear(context->reps_infos);
+  apr_array_clear(context->reps);
+  SVN_ERR(svn_io_file_trunc(context->reps_file, 0, pool));
+
+  svn_pool_clear(context->info_pool);
+  
+  return SVN_NO_ERROR;
+};
+
+static svn_error_t *
+close_pack_context(pack_context_t *context,
+                   apr_pool_t *pool)
+{
+  const char *l2p_index_path
+    = apr_pstrcat(pool, context->pack_file_path, PATH_EXT_L2P_INDEX, NULL);
+  const char *p2l_index_path
+    = apr_pstrcat(pool, context->pack_file_path, PATH_EXT_P2L_INDEX, NULL);
+  const char *proto_l2p_index_path;
+  const char *proto_p2l_index_path;
+
+  SVN_ERR(svn_io_file_name_get(&proto_l2p_index_path,
+                               context->proto_l2p_index, pool));
+  SVN_ERR(svn_io_file_name_get(&proto_p2l_index_path,
+                               context->proto_p2l_index, pool));
+  
+  /* finalize proto index files */
+  SVN_ERR(svn_io_file_close(context->proto_l2p_index, pool));
+  SVN_ERR(svn_io_file_close(context->proto_p2l_index, pool));
+
+  /* Create the actual index files*/
+  SVN_ERR(svn_fs_fs__l2p_index_create(context->fs, l2p_index_path,
+                                      proto_l2p_index_path,
+                                      context->shard_rev, pool));
+  SVN_ERR(svn_fs_fs__p2l_index_create(context->fs, p2l_index_path,
+                                      proto_p2l_index_path,
+                                      context->shard_rev, pool));
+
+  /* remove proto index files */
+  SVN_ERR(svn_io_remove_file2(proto_l2p_index_path, FALSE, pool));
+  SVN_ERR(svn_io_remove_file2(proto_p2l_index_path, FALSE, pool));
+
+  SVN_ERR(svn_io_file_close(context->pack_file, pool));
+
+  return SVN_NO_ERROR;
+};
+
+static svn_error_t *
+copy_file_data(pack_context_t *context,
+               apr_file_t *dest,
+               apr_file_t *source,
+               apr_off_t size,
+               apr_pool_t *pool)
+{
+  /* most non-representation items will be small.  Minimize the buffer
+   * and infrastructure overhead in that case. */
+  enum { STACK_BUFFER_SIZE = 1024 };
+ 
+  if (size < STACK_BUFFER_SIZE)
+    {
+      /* copy small data using a fixed-size buffer on stack */
+      char buffer[STACK_BUFFER_SIZE];
+      SVN_ERR(svn_io_file_read_full2(source, buffer, (apr_size_t)size,
+                                     NULL, NULL, pool));
+      SVN_ERR(svn_io_file_write_full(dest, buffer, (apr_size_t)size,
+                                     NULL, pool));
+    }
+  else
+    {
+      /* using streaming copies for larger data blocks.  That may require
+       * the allocation of larger buffers and we should make sure that
+       * this extra memory is released asap. */
+      fs_fs_data_t *ffd = context->fs->fsap_data;
+      apr_pool_t *copypool = svn_pool_create(pool);
+      char *buffer = apr_palloc(copypool, ffd->block_size);
+
+      while (size)
+        {
+          apr_size_t to_copy = (apr_size_t)(MIN(size, ffd->block_size));
+          if (context->cancel_func)
+            SVN_ERR(context->cancel_func(context->cancel_baton));
+
+          SVN_ERR(svn_io_file_read_full2(source, buffer, to_copy,
+                                         NULL, NULL, pool));
+          SVN_ERR(svn_io_file_write_full(dest, buffer, to_copy,
+                                         NULL, pool));
+
+          size -= to_copy;
+        }
+
+      svn_pool_destroy(copypool);
+    }
+
+  return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+copy_item_to_temp(pack_context_t *context,
+                  apr_array_header_t *entries,
+                  apr_file_t *temp_file,
+                  apr_file_t *rev_file,
+                  svn_fs_fs__p2l_entry_t *entry,
+                  apr_pool_t *pool)
+{
+  svn_fs_fs__p2l_entry_t *new_entry = apr_palloc(context->info_pool,
+                                                 sizeof(*new_entry));
+  *new_entry = *entry;
+  new_entry->offset = 0;
+  SVN_ERR(svn_io_file_seek(temp_file, SEEK_CUR, &new_entry->offset, pool));
+  APR_ARRAY_PUSH(entries, svn_fs_fs__p2l_entry_t *) = new_entry;
+  
+  SVN_ERR(copy_file_data(context, temp_file, rev_file, entry->size, pool));
+  
+  return SVN_NO_ERROR;
+}
+
+static int
+get_item_array_index(pack_context_t *context,
+                     svn_revnum_t revision,
+                     apr_int64_t item_index)
+{
+  assert(revision >= context->start_rev);
+  return (int)item_index + APR_ARRAY_IDX(context->rev_offsets,
+                                         revision - context->start_rev,
+                                         int);
+}
+
+static void
+add_item_rep_mapping(pack_context_t *context,
+                     rep_info_t *info)
+{
+  int idx = get_item_array_index(context,
+                                 info->entry->revision,
+                                 info->entry->item_index);
+
+  while (context->reps_infos->nelts <= idx)
+    APR_ARRAY_PUSH(context->reps_infos, rep_info_t *) = NULL;
+
+  assert(!APR_ARRAY_IDX(context->reps_infos, idx, rep_info_t *));
+  APR_ARRAY_IDX(context->reps_infos, idx, rep_info_t *) = info;
+}
+
+static svn_error_t *
+copy_rep_to_temp(pack_context_t *context,
+                 apr_file_t *rev_file,
+                 svn_fs_fs__p2l_entry_t *entry,
+                 apr_pool_t *pool)
+{
+  rep_info_t *rep_info = apr_pcalloc(context->info_pool, sizeof(*rep_info));
+  svn_fs_fs__rep_header_t *rep_header;
+  svn_stream_t *stream;
+
+  rep_info->entry = apr_palloc(context->info_pool, sizeof(*rep_info->entry));
+  *rep_info->entry = *entry;
+  rep_info->entry->offset = 0;
+  SVN_ERR(svn_io_file_seek(context->reps_file, SEEK_CUR,
+                           &rep_info->entry->offset, pool));
+  add_item_rep_mapping(context, rep_info);
+
+  stream = svn_stream_from_aprfile2(rev_file, TRUE, pool);
+  SVN_ERR(svn_fs_fs__read_rep_header(&rep_header, stream, pool));
+  svn_stream_close(stream);
+
+  if (   rep_header->is_delta
+      && !rep_header->is_delta_vs_empty
+      && rep_header->base_revision >= context->start_rev)
+    {
+      int idx = get_item_array_index(context, rep_header->base_revision,
+                                       rep_header->base_item_index);
+      if (idx < context->reps_infos->nelts)
+        rep_info->base = APR_ARRAY_IDX(context->reps_infos, idx, rep_info_t *);
+    }
+
+  SVN_ERR(svn_io_file_seek(rev_file, SEEK_SET, &entry->offset, pool));
+  SVN_ERR(copy_file_data(context, context->reps_file, rev_file, entry->size,
+                         pool));
+
+  return SVN_NO_ERROR;
+}
+
+static int
+compare_dir_entries(const svn_sort__item_t *a,
+                    const svn_sort__item_t *b)
+{
+  const svn_fs_dirent_t *lhs = (const svn_fs_dirent_t *) a->value;
+  const svn_fs_dirent_t *rhs = (const svn_fs_dirent_t *) b->value;
+  svn_revnum_t lhs_revision;
+  svn_revnum_t rhs_revision;
+  apr_int64_t lhs_item_index;
+  apr_int64_t rhs_item_index;
+
+  if (lhs->kind != rhs->kind)
+    return lhs->kind == svn_node_dir ? -1 : 1;
+
+  lhs_revision = svn_fs_fs__id_rev(lhs->id);
+  rhs_revision = svn_fs_fs__id_rev(rhs->id);
+  if (lhs_revision != rhs_revision)
+    return lhs_revision > rhs_revision ? -1 : 1;
+
+  lhs_item_index = svn_fs_fs__id_item(lhs->id);
+  rhs_item_index = svn_fs_fs__id_item(rhs->id);
+  if (lhs_item_index != rhs_item_index)
+    return lhs_item_index > rhs_item_index ? -1 : 1;
+
+  return 0;
+}
+
+static svn_error_t *
+copy_node_to_temp(pack_context_t *context,
+                  apr_file_t *rev_file,
+                  svn_fs_fs__p2l_entry_t *entry,
+                  apr_pool_t *pool)
+{
+  rep_info_t *rep_info = apr_pcalloc(context->info_pool, sizeof(*rep_info));
+  node_revision_t *noderev;
+  svn_stream_t *stream;
+
+  rep_info->entry = apr_palloc(context->info_pool, sizeof(*rep_info->entry));
+  *rep_info->entry = *entry;
+  rep_info->entry->offset = 0;
+  SVN_ERR(svn_io_file_seek(context->reps_file, SEEK_CUR,
+                           &rep_info->entry->offset, pool));
+  add_item_rep_mapping(context, rep_info);
+
+  stream = svn_stream_from_aprfile2(rev_file, TRUE, pool);
+  SVN_ERR(svn_fs_fs__read_noderev(&noderev, stream, pool));
+  svn_stream_close(stream);
+
+  if (noderev->data_rep && noderev->data_rep->revision >= context->start_rev)
+    {
+      int idx = get_item_array_index(context, noderev->data_rep->revision,
+                                     noderev->data_rep->item_index);
+      if (idx < context->reps_infos->nelts)
+        rep_info->base = APR_ARRAY_IDX(context->reps_infos, idx, rep_info_t *);
+    }
+
+  SVN_ERR(svn_io_file_seek(rev_file, SEEK_SET, &entry->offset, pool));
+  SVN_ERR(copy_file_data(context, context->reps_file, rev_file, entry->size,
+                         pool));
+
+  if (noderev->kind == svn_node_dir && rep_info->base)
+    {
+      apr_hash_t *directory;
+      apr_pool_t *scratch_pool = svn_pool_create(pool);
+      apr_array_header_t *sorted;
+      int i;
+
+      rep_info = rep_info->base;
+      SVN_ERR(svn_fs_fs__rep_contents_dir(&directory, context->fs, noderev,
+                                          scratch_pool));
+      sorted = svn_sort__hash(directory, compare_dir_entries, scratch_pool);
+      for (i = 0; i < sorted->nelts; ++i)
+        {
+          svn_fs_dirent_t *dir_entry
+            = APR_ARRAY_IDX(sorted, i, svn_sort__item_t).value;
+          svn_revnum_t revision = svn_fs_fs__id_rev(dir_entry->id);
+          apr_int64_t item_index = svn_fs_fs__id_item(dir_entry->id);
+
+          if (revision >= context->start_rev)
+            {
+              int idx = get_item_array_index(context, revision, item_index);
+              if (idx < context->reps_infos->nelts)
+                {
+                  rep_info->next = APR_ARRAY_IDX(context->reps_infos, idx,
+                                                 rep_info_t *);
+                  rep_info = rep_info->next;
+                }
+            }
+        }
+
+      svn_pool_destroy(scratch_pool);
+    }
+
+  return SVN_NO_ERROR;
+}
+
+static int
+compare_p2l_info(const svn_fs_fs__p2l_entry_t * const * lhs,
+                 const svn_fs_fs__p2l_entry_t * const * rhs)
+{
+  assert(*lhs != *rhs);
+  
+  if ((*lhs)->revision == (*rhs)->revision)
+    return (*lhs)->item_index > (*rhs)->item_index ? -1 : 1;
+
+  return (*lhs)->revision > (*rhs)->revision ? -1 : 1;
+}
+
+static void
+sort_items(apr_array_header_t *entries)
+{
+  qsort(entries->elts, entries->nelts, entries->elt_size,
+        (int (*)(const void *, const void *))compare_p2l_info);
+}
+
+static int
+compare_p2l_info_rev(const svn_fs_fs__p2l_entry_t * const * lhs,
+                     const svn_fs_fs__p2l_entry_t * const * rhs)
+{
+  assert(*lhs != *rhs);
+
+  if ((*lhs)->revision == (*rhs)->revision)
+    return 0;
+
+  return (*lhs)->revision < (*rhs)->revision ? -1 : 1;
+}
+
+static void
+sort_by_rev(apr_array_header_t *entries)
+{
+  qsort(entries->elts, entries->nelts, entries->elt_size,
+        (int (*)(const void *, const void *))compare_p2l_info_rev);
+}
+
+static void
+pick_recursively(pack_context_t *context,
+                 rep_info_t *info)
+{
+  rep_info_t *temp;
+  do
+    {
+      if (info->entry)
+        {
+          APR_ARRAY_PUSH(context->reps, svn_fs_fs__p2l_entry_t *)
+            = info->entry;
+          info->entry = NULL;
+        }
+
+      if (info->base)
+        {
+          pick_recursively(context, info->base);
+          info->base = NULL;
+        }
+
+      temp = info->next;
+      info->next = NULL;
+      info = temp;
+    }
+  while (info);
+}
+
+static void
+sort_reps(pack_context_t *context)
+{
+  int i;
+  for (i = context->reps_infos->nelts - 1; i >= 0; --i)
+    {
+      rep_info_t *info = APR_ARRAY_IDX(context->reps_infos, i, rep_info_t *);
+      if (   info
+          && info->entry
+          && info->entry->item_index == SVN_FS_FS__ITEM_INDEX_ROOT_NODE)
+        do
+          {
+            APR_ARRAY_PUSH(context->reps, svn_fs_fs__p2l_entry_t *)
+              = info->entry;
+            info->entry = NULL;
+            info = info->base;
+          }
+        while (info && info->entry);
+    }
+
+  for (i = context->reps_infos->nelts - 1; i >= 0; --i)
+    {
+      rep_info_t *info = APR_ARRAY_IDX(context->reps_infos, i, rep_info_t *);
+      if (info && info->entry == NULL && info->next)
+        pick_recursively(context, info);
+    }
+
+  for (i = context->reps_infos->nelts - 1; i >= 0; --i)
+    {
+      rep_info_t *info = APR_ARRAY_IDX(context->reps_infos, i, rep_info_t *);
+      if (info && info->entry)
+        pick_recursively(context, info);
+    }
+}
+
+static svn_error_t *
+copy_items_from_temp(pack_context_t *context,
+                     apr_array_header_t *entries,
+                     apr_file_t *temp_file,
+                     apr_pool_t *pool)
+{
+  apr_pool_t *iterpool = svn_pool_create(pool);
+  int i;
+  for (i = 0; i < entries->nelts; ++i)
+    {
+      svn_fs_fs__p2l_entry_t *entry
+        = APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t *);
+
+      SVN_ERR(svn_io_file_seek(temp_file, SEEK_SET, &entry->offset,
+                               iterpool));
+      SVN_ERR(copy_file_data(context, context->pack_file, temp_file,
+                             entry->size, pool));
+
+      entry->offset = context->pack_offset;
+      context->pack_offset += entry->size;
+       
+      SVN_ERR(svn_fs_fs__p2l_proto_index_add_entry
+                  (context->proto_p2l_index, entry, iterpool));
+    }
+
+  svn_pool_destroy(iterpool);
+
+  return SVN_NO_ERROR;
+}
+
+static void
+append_entries(apr_array_header_t *dest,
+               apr_array_header_t *to_append)
+{
+  int i;
+  for (i = 0; i < to_append->nelts; ++i)
+    APR_ARRAY_PUSH(dest, svn_fs_fs__p2l_entry_t *)
+      = APR_ARRAY_IDX(to_append, i, svn_fs_fs__p2l_entry_t *);
+}
+
+static svn_error_t *
+write_l2p_index(pack_context_t *context,
+                apr_pool_t *pool)
+{
+  apr_pool_t *iterpool = svn_pool_create(pool);
+  svn_revnum_t prev_rev = SVN_INVALID_REVNUM;
+  int i;
+
+  append_entries(context->reps, context->changes);
+  append_entries(context->reps, context->file_props);
+  append_entries(context->reps, context->dir_props);
+  sort_by_rev(context->reps);
+  
+  for (i = 0; i < context->reps->nelts; ++i)
+    {
+      svn_fs_fs__p2l_entry_t *entry
+        = APR_ARRAY_IDX(context->reps, i, svn_fs_fs__p2l_entry_t *);
+
+      if (prev_rev != entry->revision)
+        {
+          prev_rev = entry->revision;
+          SVN_ERR(svn_fs_fs__l2p_proto_index_add_revision
+                      (context->proto_l2p_index, iterpool));
+        }
+
+      SVN_ERR(svn_fs_fs__l2p_proto_index_add_entry
+                  (context->proto_l2p_index,
+                   entry->offset, entry->item_index, iterpool));
+
+      if (i % 256 == 0)
+        svn_pool_clear(iterpool);
+    }
+
+  svn_pool_destroy(iterpool);
+
+  return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+pack_section(pack_context_t *context,
+             apr_pool_t *pool)
+{
+  apr_pool_t *revpool = svn_pool_create(pool);
+  apr_pool_t *iterpool = svn_pool_create(pool);
+
+  svn_revnum_t revision;
+  for (revision = context->start_rev; revision < context->end_rev; ++revision)
+    {
+      apr_off_t offset = 0;
+      apr_finfo_t finfo;
+      apr_file_t *rev_file;
+      
+      /* Get the size of the file. */
+      const char *path = svn_dirent_join(context->shard_dir,
+                                         apr_psprintf(revpool, "%ld",
+                                                      revision),
+                                         revpool);
+      SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, revpool));
+
+      SVN_ERR(svn_io_file_open(&rev_file, path,
+                               APR_READ | APR_BUFFERED | APR_BINARY,
+                               APR_OS_DEFAULT, revpool));
+
+      APR_ARRAY_PUSH(context->rev_offsets, int) = context->reps_infos->nelts;
+  
+      /* read the phys-to-log index file until we covered the whole rev file.
+       * That index contains enough info to build both target indexes from it. */
+      while (offset < finfo.size)
+        {
+          /* read one cluster */
+          int i;
+          apr_array_header_t *entries;
+          SVN_ERR(svn_fs_fs__p2l_index_lookup(&entries, context->fs,
+                                              revision, offset,
+                                              iterpool));
+
+          for (i = 0; i < entries->nelts; ++i)
+            {
+              svn_fs_fs__p2l_entry_t *entry
+                = &APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t);
+
+              /* skip first entry if that was duplicated due crossing a
+                 cluster boundary */
+              if (offset > entry->offset)
+                continue;
+
+              /* process entry while inside the rev file */
+              offset = entry->offset;
+              if (offset < finfo.size)
+                {
+                  SVN_ERR(svn_io_file_seek(rev_file, SEEK_SET, &offset,
+                                           iterpool));
+
+                  if (entry->type == SVN_FS_FS__ITEM_TYPE_CHANGES)
+                    SVN_ERR(copy_item_to_temp(context,
+                                              context->changes,
+                                              context->changes_file,
+                                              rev_file, entry, iterpool));
+                  else if (entry->type == SVN_FS_FS__ITEM_TYPE_FILE_PROPS)
+                    SVN_ERR(copy_item_to_temp(context,
+                                              context->file_props,
+                                              context->file_props_file,
+                                              rev_file, entry, iterpool));
+                  else if (entry->type == SVN_FS_FS__ITEM_TYPE_DIR_PROPS)
+                    SVN_ERR(copy_item_to_temp(context,
+                                              context->dir_props,
+                                              context->dir_props_file,
+                                              rev_file, entry, iterpool));
+                  else if (   entry->type == SVN_FS_FS__ITEM_TYPE_FILE_REP
+                           || entry->type == SVN_FS_FS__ITEM_TYPE_DIR_REP)
+                    SVN_ERR(copy_rep_to_temp(context, rev_file, entry,
+                                             iterpool));
+                  else if (entry->type == SVN_FS_FS__ITEM_TYPE_NODEREV)
+                    SVN_ERR(copy_node_to_temp(context, rev_file, entry,
+                                              iterpool));
+                  else
+                    SVN_ERR_ASSERT(entry->type == SVN_FS_FS__ITEM_TYPE_UNUSED);
+                    
+                  offset += entry->size;
+                }
+            }
+
+          if (context->cancel_func)
+            SVN_ERR(context->cancel_func(context->cancel_baton));
+
+          svn_pool_clear(iterpool);
+        }
+
+      svn_pool_clear(revpool);
+    }
+
+  svn_pool_destroy(iterpool);
+
+  sort_items(context->changes);
+  sort_items(context->file_props);
+  sort_items(context->dir_props);
+  sort_reps(context);
+  
+  SVN_ERR(copy_items_from_temp(context, context->changes,
+                               context->changes_file, revpool));
+  svn_pool_clear(revpool);
+  SVN_ERR(copy_items_from_temp(context, context->file_props,
+                               context->file_props_file, revpool));
+  svn_pool_clear(revpool);
+  SVN_ERR(copy_items_from_temp(context, context->dir_props,
+                               context->dir_props_file, revpool));
+  svn_pool_clear(revpool);
+  SVN_ERR(copy_items_from_temp(context, context->reps,
+                               context->reps_file, revpool));
+  svn_pool_clear(revpool);
+  SVN_ERR(write_l2p_index(context, revpool));
+
+  svn_pool_destroy(revpool);
+  
+  return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+append_revision(pack_context_t *context,
+                apr_pool_t *pool)
+{
+  apr_off_t offset = 0;
+  apr_pool_t *iterpool = svn_pool_create(pool);
+  apr_file_t *rev_file;
+  apr_finfo_t finfo;
+
+  /* Get the size of the file. */
+  const char *path = svn_dirent_join(context->shard_dir,
+                                     apr_psprintf(iterpool, "%ld",
+                                                  context->start_rev),
+                                     pool);
+  SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, pool));
+
+  /* Copy all the bits from the rev file to the end of the pack file. */
+  SVN_ERR(svn_io_file_open(&rev_file, path,
+                           APR_READ | APR_BUFFERED | APR_BINARY,
+                           APR_OS_DEFAULT, pool));
+  SVN_ERR(copy_file_data(context, context->pack_file, rev_file, finfo.size, 
+                         iterpool));
+
+  /* mark the start of a new revision */
+  SVN_ERR(svn_fs_fs__l2p_proto_index_add_revision(context->proto_l2p_index,
+                                                  pool));
+
+  /* read the phys-to-log index file until we covered the whole rev file.
+   * That index contains enough info to build both target indexes from it. */
+  while (offset < finfo.size)
+    {
+      /* read one cluster */
+      int i;
+      apr_array_header_t *entries;
+      SVN_ERR(svn_fs_fs__p2l_index_lookup(&entries, context->fs,
+                                          context->start_rev, offset,
+                                          iterpool));
+
+      for (i = 0; i < entries->nelts; ++i)
+        {
+          svn_fs_fs__p2l_entry_t *entry
+            = &APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t);
+
+          /* skip first entry if that was duplicated due crossing a
+             cluster boundary */
+          if (offset > entry->offset)
+            continue;
+
+          /* process entry while inside the rev file */
+          offset = entry->offset;
+          if (offset < finfo.size)
+            {
+              entry->offset += context->pack_offset;
+              offset += entry->size;
+              SVN_ERR(svn_fs_fs__l2p_proto_index_add_entry
+                        (context->proto_l2p_index,
+                         entry->offset, entry->item_index, iterpool));
+              SVN_ERR(svn_fs_fs__p2l_proto_index_add_entry
+                        (context->proto_p2l_index, entry, iterpool));
+            }
+        }
+
+      svn_pool_clear(iterpool);
+    }
+
+  svn_pool_destroy(iterpool);
+  context->pack_offset += finfo.size;
+
+  return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+pack_log_addressed(svn_fs_t *fs,
+                   const char *pack_file_dir,
+                   const char *shard_dir,
+                   svn_revnum_t shard_rev,
+                   apr_size_t max_mem,
+                   svn_cancel_func_t cancel_func,
+                   void *cancel_baton,
+                   apr_pool_t *pool)
+{
+  enum
+    {
+      /* estimated amount of memory used to represent one item in memory
+       * during rev file packing */
+      PER_ITEM_MEM = APR_ALIGN_DEFAULT(sizeof(rep_info_t))
+                   + APR_ALIGN_DEFAULT(sizeof(svn_fs_fs__p2l_entry_t))
+                   + 6 * sizeof(void*)
+    };
+
+  apr_size_t max_items = max_mem / PER_ITEM_MEM;
+  apr_array_header_t *max_ids;
+  pack_context_t context = { 0 };
+  int i;
+  apr_size_t item_count = 0;
+
+  SVN_ERR(initialize_pack_context(&context, fs, pack_file_dir, shard_dir,
+                                  shard_rev, max_items, cancel_func,
+                                  cancel_baton, pool));
+ 
+  SVN_ERR(svn_fs_fs__l2p_get_max_ids(&max_ids, fs, shard_rev,
+                                     context.shard_end_rev - shard_rev,
+                                     pool));
+
+  for (i = 0; i < max_ids->nelts; ++i)
+    if (APR_ARRAY_IDX(max_ids, i, apr_uint64_t) + item_count <= max_items)
+      {
+        context.end_rev++;
+      }
+    else
+      {
+        if (context.start_rev < context.end_rev)
+          {
+            SVN_ERR(pack_section(&context, pool));
+            SVN_ERR(reset_pack_context(&context, pool));
+            item_count = 0;
+          }
+
+        context.start_rev = i + context.shard_rev;
+        context.end_rev = context.start_rev + 1;
+
+        if (APR_ARRAY_IDX(max_ids, i, apr_uint64_t) > max_items)
+          {
+            SVN_ERR(append_revision(&context, pool));
+            context.start_rev++;
+          }
+        else
+          item_count += (apr_size_t)APR_ARRAY_IDX(max_ids, i, apr_uint64_t);
+      }
+      
+  if (context.start_rev < context.end_rev)
+    SVN_ERR(pack_section(&context, pool));
+
+  SVN_ERR(reset_pack_context(&context, pool));
+  SVN_ERR(close_pack_context(&context, pool));
+
+  return SVN_NO_ERROR;
+}
+
 /* Given REV in FS, set *REV_OFFSET to REV's offset in the packed file.
    Use POOL for temporary allocations. */
 svn_error_t *
@@ -98,66 +945,70 @@ svn_fs_fs__get_packed_offset(apr_off_t *
   return svn_cache__set(ffd->packed_offset_cache, &shard, manifest, pool);
 }
 
-/* Copy the index information from the unpacked revision REV in FS to the
- * PROTO_L2P_INDEX and PROTO_P2L_INDEX proto index files, respectively.
- * Assume that the rev file will be appended to the pack file at offset
- * PACK_OFFSET and that the unpacked rev file contains FILE_SIZE bytes.
- * Use POOL for allocations.
- */
 static svn_error_t *
-copy_indexes(svn_fs_t *fs,
-             apr_file_t *proto_l2p_index,
-             apr_file_t *proto_p2l_index,
-             svn_revnum_t rev,
-             apr_off_t pack_offset,
-             apr_off_t file_size,
-             apr_pool_t *pool)
+pack_phys_addressed(svn_fs_t *fs,
+                    const char *pack_file_dir,
+                    const char *shard_path,
+                    svn_revnum_t start_rev,
+                    int max_files_per_dir,
+                    svn_cancel_func_t cancel_func,
+                    void *cancel_baton,
+                    apr_pool_t *pool)
 {
-  apr_off_t offset = 0;
-  apr_pool_t *iterpool = svn_pool_create(pool);
+  const char *pack_file_path, *manifest_file_path;
+  svn_stream_t *pack_stream, *manifest_stream;
+  svn_revnum_t end_rev, rev;
+  apr_off_t next_offset;
+  apr_pool_t *iterpool;
 
-  /* mark the start of a new revision */
-  SVN_ERR(svn_fs_fs__l2p_proto_index_add_revision(proto_l2p_index, pool));
+  /* Some useful paths. */
+  pack_file_path = svn_dirent_join(pack_file_dir, PATH_PACKED, pool);
+  manifest_file_path = svn_dirent_join(pack_file_dir, PATH_MANIFEST, pool);
 
-  /* read the phys-to-log index file until we covered the whole rev file.
-   * That index contains enough info to build both target indexes from it. */
-  while (offset < file_size)
-    {
-      /* read one cluster */
-      int i;
-      apr_array_header_t *entries;
-      SVN_ERR(svn_fs_fs__p2l_index_lookup(&entries, fs, rev, offset,
-                                          iterpool));
+  /* Create the new directory and pack file. */
+  SVN_ERR(svn_stream_open_writable(&pack_stream, pack_file_path, pool,
+                                    pool));
 
-      for (i = 0; i < entries->nelts; ++i)
-        {
-          svn_fs_fs__p2l_entry_t *entry
-            = &APR_ARRAY_IDX(entries, i, svn_fs_fs__p2l_entry_t);
+  /* Create the manifest file. */
+  SVN_ERR(svn_stream_open_writable(&manifest_stream, manifest_file_path,
+                                   pool, pool));
 
-          /* skip first entry if that was duplicated due crossing a
-             cluster boundary */
-          if (offset > entry->offset)
-            continue;
+  end_rev = start_rev + max_files_per_dir - 1;
+  next_offset = 0;
+  iterpool = svn_pool_create(pool);
 
-          /* process entry while inside the rev file */
-          offset = entry->offset;
-          if (offset < file_size)
-            {
-              entry->offset += pack_offset;
-              offset += entry->size;
-              SVN_ERR(svn_fs_fs__l2p_proto_index_add_entry(proto_l2p_index,
-                                                           entry->offset,
-                                                           entry->item_index,
-                                                           iterpool));
-              SVN_ERR(svn_fs_fs__p2l_proto_index_add_entry(proto_p2l_index,
-                                                           entry,
-                                                           iterpool));
-            }
-        }
+  /* Iterate over the revisions in this shard, squashing them together. */
+  for (rev = start_rev; rev <= end_rev; rev++)
+    {
+      svn_stream_t *rev_stream;
+      apr_finfo_t finfo;
+      const char *path;
 
       svn_pool_clear(iterpool);
+
+      /* Get the size of the file. */
+      path = svn_dirent_join(shard_path, apr_psprintf(iterpool, "%ld", rev),
+                             iterpool);
+      SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, iterpool));
+
+      /* build manifest */
+      SVN_ERR(svn_stream_printf(manifest_stream, iterpool,
+                                "%" APR_OFF_T_FMT "\n", next_offset));
+      next_offset += finfo.size;
+
+      /* Copy all the bits from the rev file to the end of the pack file. */
+      SVN_ERR(svn_stream_open_readonly(&rev_stream, path, iterpool, iterpool));
+      SVN_ERR(svn_stream_copy3(rev_stream, svn_stream_disown(pack_stream,
+                                                             iterpool),
+                               cancel_func, cancel_baton, iterpool));
     }
 
+  /* disallow write access to the manifest file */
+  SVN_ERR(svn_stream_close(manifest_stream));
+  SVN_ERR(svn_io_set_file_read_only(manifest_file_path, FALSE, iterpool));
+
+  SVN_ERR(svn_stream_close(pack_stream));
+
   svn_pool_destroy(iterpool);
 
   return SVN_NO_ERROR;
@@ -176,29 +1027,17 @@ pack_rev_shard(svn_fs_t *fs,
                const char *shard_path,
                apr_int64_t shard,
                int max_files_per_dir,
+               apr_size_t max_mem,
                svn_cancel_func_t cancel_func,
                void *cancel_baton,
                apr_pool_t *pool)
 {
   fs_fs_data_t *ffd = fs->fsap_data;
-  const char *pack_file_path, *manifest_file_path;
-  const char *proto_l2p_index_path, *proto_p2l_index_path;
-  const char *l2p_index_path, *p2l_index_path;
-  svn_stream_t *pack_stream, *manifest_stream;
-  svn_revnum_t start_rev, end_rev, rev;
-  apr_off_t next_offset;
-  apr_pool_t *iterpool;
-  apr_file_t *proto_l2p_index, *proto_p2l_index;
+  const char *pack_file_path;
+  svn_revnum_t shard_rev = (svn_revnum_t) (shard * max_files_per_dir);
 
   /* Some useful paths. */
   pack_file_path = svn_dirent_join(pack_file_dir, PATH_PACKED, pool);
-  manifest_file_path = svn_dirent_join(pack_file_dir, PATH_MANIFEST, pool);
-  l2p_index_path = apr_pstrcat(pool, pack_file_path, PATH_EXT_L2P_INDEX, NULL);
-  p2l_index_path = apr_pstrcat(pool, pack_file_path, PATH_EXT_P2L_INDEX, NULL);
-  proto_l2p_index_path = svn_dirent_join(pack_file_dir,
-                                         PATH_INDEX PATH_EXT_L2P_INDEX, pool);
-  proto_p2l_index_path = svn_dirent_join(pack_file_dir,
-                                         PATH_INDEX PATH_EXT_P2L_INDEX, pool);
 
   /* Remove any existing pack file for this shard, since it is incomplete. */
   SVN_ERR(svn_io_remove_dir2(pack_file_dir, TRUE, cancel_func, cancel_baton,
@@ -206,91 +1045,18 @@ pack_rev_shard(svn_fs_t *fs,
 
   /* Create the new directory and pack file. */
   SVN_ERR(svn_io_dir_make(pack_file_dir, APR_OS_DEFAULT, pool));
-  SVN_ERR(svn_stream_open_writable(&pack_stream, pack_file_path, pool,
-                                    pool));
 
   /* Index information files */
   if (ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT)
-    {
-      /* Create proto index files*/
-      SVN_ERR(svn_fs_fs__l2p_proto_index_open(&proto_l2p_index, 
-                                              proto_l2p_index_path, pool));
-      SVN_ERR(svn_fs_fs__p2l_proto_index_open(&proto_p2l_index,
-                                              proto_p2l_index_path, pool));
-    }
+    SVN_ERR(pack_log_addressed(fs, pack_file_dir, shard_path, shard_rev,
+                               max_mem, cancel_func, cancel_baton, pool));
   else
-    {
-      /* Create the manifest file. */
-      SVN_ERR(svn_stream_open_writable(&manifest_stream, manifest_file_path,
-                                       pool, pool));
-    }
+    SVN_ERR(pack_phys_addressed(fs, pack_file_dir, shard_path, shard_rev,
+                                max_files_per_dir, cancel_func,
+                                cancel_baton, pool));
   
-  start_rev = (svn_revnum_t) (shard * max_files_per_dir);
-  end_rev = (svn_revnum_t) ((shard + 1) * (max_files_per_dir) - 1);
-  next_offset = 0;
-  iterpool = svn_pool_create(pool);
-
-  /* Iterate over the revisions in this shard, squashing them together. */
-  for (rev = start_rev; rev <= end_rev; rev++)
-    {
-      svn_stream_t *rev_stream;
-      apr_finfo_t finfo;
-      const char *path;
-
-      svn_pool_clear(iterpool);
-
-      /* Get the size of the file. */
-      path = svn_dirent_join(shard_path, apr_psprintf(iterpool, "%ld", rev),
-                             iterpool);
-      SVN_ERR(svn_io_stat(&finfo, path, APR_FINFO_SIZE, iterpool));
-
-      /* build indexes / manifest */
-      if (ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT)
-        SVN_ERR(copy_indexes(fs, proto_l2p_index, proto_p2l_index,
-                             rev, next_offset, finfo.size, pool));
-      else
-        SVN_ERR(svn_stream_printf(manifest_stream, iterpool,
-                                  "%" APR_OFF_T_FMT "\n", next_offset));
-      next_offset += finfo.size;
-
-      /* Copy all the bits from the rev file to the end of the pack file. */
-      SVN_ERR(svn_stream_open_readonly(&rev_stream, path, iterpool, iterpool));
-      SVN_ERR(svn_stream_copy3(rev_stream, svn_stream_disown(pack_stream,
-                                                             iterpool),
-                          cancel_func, cancel_baton, iterpool));
-    }
-
-  /* Finalize Index information files */
-  if (ffd->format >= SVN_FS_FS__MIN_LOG_ADDRESSING_FORMAT)
-    {
-      /* finalize proto index files */
-      SVN_ERR(svn_io_file_close(proto_l2p_index, iterpool));
-      SVN_ERR(svn_io_file_close(proto_p2l_index, iterpool));
-      
-      /* Create the actual index files*/
-      SVN_ERR(svn_fs_fs__l2p_index_create(fs, l2p_index_path,
-                                          proto_l2p_index_path,
-                                          start_rev, iterpool));
-      SVN_ERR(svn_fs_fs__p2l_index_create(fs, p2l_index_path,
-                                          proto_p2l_index_path,
-                                          start_rev, iterpool));
-
-      /* remove proto index files */
-      SVN_ERR(svn_io_remove_file2(proto_l2p_index_path, FALSE, iterpool));
-      SVN_ERR(svn_io_remove_file2(proto_p2l_index_path, FALSE, iterpool));
-    }
-  else
-    {
-      /* disallow write access to the manifest file */
-      SVN_ERR(svn_stream_close(manifest_stream));
-      SVN_ERR(svn_io_set_file_read_only(manifest_file_path, FALSE, iterpool));
-    }
-
-  SVN_ERR(svn_stream_close(pack_stream));
-  SVN_ERR(svn_io_copy_perms(shard_path, pack_file_dir, iterpool));
-  SVN_ERR(svn_io_set_file_read_only(pack_file_path, FALSE, iterpool));
-
-  svn_pool_destroy(iterpool);
+  SVN_ERR(svn_io_copy_perms(shard_path, pack_file_dir, pool));
+  SVN_ERR(svn_io_set_file_read_only(pack_file_path, FALSE, pool));
 
   return SVN_NO_ERROR;
 }
@@ -341,7 +1107,7 @@ pack_shard(const char *revs_dir,
 
   /* pack the revision content */
   SVN_ERR(pack_rev_shard(fs, rev_pack_file_dir, rev_shard_path,
-                         shard, max_files_per_dir,
+                         shard, max_files_per_dir, 64 * 1024 * 1024,
                          cancel_func, cancel_baton, pool));
 
   /* if enabled, pack the revprops in an equivalent way */



Mime
View raw message