bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-922: CreaterId in ledger metadata
Date Mon, 11 Jul 2016 21:01:36 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master e5f3afa5e -> 6d71b8292


BOOKKEEPER-922: CreaterId in ledger metadata

Introduces a generic Map<String, byte[]> customMetadata to be stored as part of ledger metadata on the metadata server(zk).

Author: Rithin Shetty <rithingmail.com>

Author: Rithin <rithin.shetty@salesforce.com>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

Closes #46 from rithin-shetty/JIRA-922-ledgerCustomMetadata


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/6d71b829
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/6d71b829
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/6d71b829

Branch: refs/heads/master
Commit: 6d71b8292bc255e21f2dcf048be191650c01b4dc
Parents: e5f3afa
Author: Rithin <rithin.shetty@salesforce.com>
Authored: Mon Jul 11 14:01:30 2016 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Mon Jul 11 14:01:30 2016 -0700

----------------------------------------------------------------------
 .../apache/bookkeeper/client/BookKeeper.java    |  80 +-
 .../bookkeeper/client/LedgerCreateOp.java       |  13 +-
 .../apache/bookkeeper/client/LedgerHandle.java  |  10 +
 .../bookkeeper/client/LedgerMetadata.java       |  75 +-
 .../apache/bookkeeper/proto/DataFormats.java    | 753 ++++++++++++++++++-
 .../src/main/proto/DataFormats.proto            |   8 +-
 .../client/BookieWriteLedgerTest.java           |  50 ++
 7 files changed, 948 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6d71b829/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 101f5e8..75ab759 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.client;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -431,7 +432,7 @@ public class BookKeeper implements AutoCloseable {
                                   final DigestType digestType,
                                   final byte[] passwd, final CreateCallback cb, final Object ctx)
     {
-        asyncCreateLedger(ensSize, writeQuorumSize, writeQuorumSize, digestType, passwd, cb, ctx);
+        asyncCreateLedger(ensSize, writeQuorumSize, writeQuorumSize, digestType, passwd, cb, ctx, null);
     }
 
     /**
@@ -462,13 +463,13 @@ public class BookKeeper implements AutoCloseable {
      *          createCallback implementation
      * @param ctx
      *          optional control object
+     * @param customMetadata
+     *          optional customMetadata that holds user specified metadata
      */
 
-    public void asyncCreateLedger(final int ensSize,
-                                  final int writeQuorumSize,
-                                  final int ackQuorumSize,
-                                  final DigestType digestType,
-                                  final byte[] passwd, final CreateCallback cb, final Object ctx) {
+    public void asyncCreateLedger(final int ensSize, final int writeQuorumSize, final int ackQuorumSize,
+                                  final DigestType digestType, final byte[] passwd,
+                                  final CreateCallback cb, final Object ctx, final Map<String, byte[]> customMetadata) {
         if (writeQuorumSize < ackQuorumSize) {
             throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
         }
@@ -479,7 +480,7 @@ public class BookKeeper implements AutoCloseable {
                 return;
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                               ackQuorumSize, digestType, passwd, cb, ctx)
+                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata)
                 .initiate();
         } finally {
             closeLock.readLock().unlock();
@@ -519,7 +520,27 @@ public class BookKeeper implements AutoCloseable {
     public LedgerHandle createLedger(int ensSize, int qSize,
                                      DigestType digestType, byte passwd[])
             throws InterruptedException, BKException {
-        return createLedger(ensSize, qSize, qSize, digestType, passwd);
+        return createLedger(ensSize, qSize, qSize, digestType, passwd, null);
+    }
+
+    /**
+     * Synchronous call to create ledger. Parameters match those of
+     * {@link #asyncCreateLedger(int, int, DigestType, byte[],
+     *                           AsyncCallback.CreateCallback, Object)}
+     *
+     * @param ensSize
+     * @param writeQuorumSize
+     * @param ackQuorumSize
+     * @param digestType
+     * @param passwd
+     * @return a handle to the newly created ledger
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize,
+            DigestType digestType, byte passwd[])
+            throws InterruptedException, BKException {
+        return createLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, null);
     }
 
     /**
@@ -532,12 +553,13 @@ public class BookKeeper implements AutoCloseable {
      * @param ackQuorumSize
      * @param digestType
      * @param passwd
+     * @param customMetadata
      * @return a handle to the newly created ledger
      * @throws InterruptedException
      * @throws BKException
      */
     public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize,
-                                     DigestType digestType, byte passwd[])
+                                     DigestType digestType, byte passwd[], final Map<String, byte[]> customMetadata)
             throws InterruptedException, BKException {
         SyncCounter counter = new SyncCounter();
         counter.inc();
@@ -545,7 +567,7 @@ public class BookKeeper implements AutoCloseable {
          * Calls asynchronous version
          */
         asyncCreateLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
-                          new SyncCreateCallback(), counter);
+                          new SyncCreateCallback(), counter, customMetadata);
 
         /*
          * Wait
@@ -574,6 +596,7 @@ public class BookKeeper implements AutoCloseable {
      * @param ackQuorumSize
      * @param digestType
      * @param passwd
+     * @param customMetadata
      * @return a handle to the newly created ledger
      * @throws InterruptedException
      * @throws BKException
@@ -581,13 +604,36 @@ public class BookKeeper implements AutoCloseable {
     public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize,
                                         DigestType digestType, byte passwd[])
             throws InterruptedException, BKException {
+        return createLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, null);
+    }
+
+    /**
+     * Synchronous call to create ledger.
+     * Creates a new ledger asynchronously and returns {@link LedgerHandleAdv} which can accept entryId.
+     * Parameters must match those of
+     * {@link #asyncCreateLedgerAdv(int, int, int, DigestType, byte[],
+     *                           AsyncCallback.CreateCallback, Object)}
+     *
+     * @param ensSize
+     * @param writeQuorumSize
+     * @param ackQuorumSize
+     * @param digestType
+     * @param passwd
+     * @param customMetadata
+     * @return a handle to the newly created ledger
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize,
+                                        DigestType digestType, byte passwd[], final Map<String, byte[]> customMetadata)
+            throws InterruptedException, BKException {
         SyncCounter counter = new SyncCounter();
         counter.inc();
         /*
          * Calls asynchronous version
          */
         asyncCreateLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
-                             new SyncCreateCallback(), counter);
+                             new SyncCreateCallback(), counter, customMetadata);
 
         /*
          * Wait
@@ -633,12 +679,12 @@ public class BookKeeper implements AutoCloseable {
      *          createCallback implementation
      * @param ctx
      *          optional control object
+     * @param customMetadata
+     *          optional customMetadata that holds user specified metadata
      */
-    public void asyncCreateLedgerAdv(final int ensSize,
-                                     final int writeQuorumSize,
-                                     final int ackQuorumSize,
-                                     final DigestType digestType,
-                                     final byte[] passwd, final CreateCallback cb, final Object ctx) {
+    public void asyncCreateLedgerAdv(final int ensSize, final int writeQuorumSize, final int ackQuorumSize,
+            final DigestType digestType, final byte[] passwd, final CreateCallback cb, final Object ctx,
+            final Map<String, byte[]> customMetadata) {
         if (writeQuorumSize < ackQuorumSize) {
             throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
         }
@@ -649,7 +695,7 @@ public class BookKeeper implements AutoCloseable {
                 return;
             }
             new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
-                               ackQuorumSize, digestType, passwd, cb, ctx).initiateAdv();
+                               ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv();
         } finally {
             closeLock.readLock().unlock();
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6d71b829/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 6f794d0..3626ce0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
 
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
@@ -75,14 +76,14 @@ class LedgerCreateOp implements GenericCallback<Void> {
      *       callback implementation
      * @param ctx
      *       optional control object
+     * @param customMetadata
+     *       A map of user specified custom metadata about the ledger to be persisted; will not try to
+     *       preserve the order(e.g. sortedMap) upon later retireval.
      */
-
-    LedgerCreateOp(BookKeeper bk, int ensembleSize,
-                   int writeQuorumSize, int ackQuorumSize,
-                   DigestType digestType,
-                   byte[] passwd, CreateCallback cb, Object ctx) {
+    LedgerCreateOp(BookKeeper bk, int ensembleSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType,
+            byte[] passwd, CreateCallback cb, Object ctx, final Map<String, byte[]> customMetadata) {
         this.bk = bk;
-        this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, passwd);
+        this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, passwd, customMetadata);
         this.digestType = digestType;
         this.passwd = passwd;
         this.cb = cb;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6d71b829/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index dff1f77..edb5873 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -180,6 +181,15 @@ public class LedgerHandle implements AutoCloseable {
     }
 
     /**
+     * Get this ledger's customMetadata map.
+     *
+     * @return map containing user provided customMetadata.
+     */
+    public Map<String, byte[]> getCustomMetadata() {
+        return metadata.getCustomMetadata();
+    }
+
+    /**
      * Get the DigestManager
      *
      * @return DigestManager for the LedgerHandle

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6d71b829/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index a58adba..cfe3e50 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.client;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
+
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
 import org.apache.bookkeeper.versioning.Version;
@@ -31,13 +32,16 @@ import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
 import static com.google.common.base.Charsets.UTF_8;
+
 import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
 
 /**
  * This class encapsulates all the ledger metadata that is persistently stored
@@ -79,8 +83,10 @@ public class LedgerMetadata {
     private LedgerMetadataFormat.DigestType digestType;
     private byte[] password;
 
+    private Map<String, byte[]> customMetadata = Maps.newHashMap();
+
     public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-                          BookKeeper.DigestType digestType, byte[] password) {
+                          BookKeeper.DigestType digestType, byte[] password, Map<String, byte[]> customMetadata) {
         this.ensembleSize = ensembleSize;
         this.writeQuorumSize = writeQuorumSize;
         this.ackQuorumSize = ackQuorumSize;
@@ -99,6 +105,14 @@ public class LedgerMetadata {
             LedgerMetadataFormat.DigestType.HMAC : LedgerMetadataFormat.DigestType.CRC32;
         this.password = Arrays.copyOf(password, password.length);
         this.hasPassword = true;
+        if (customMetadata != null) {
+            this.customMetadata = customMetadata;
+        }
+    }
+
+    public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+            BookKeeper.DigestType digestType, byte[] password) {
+        this(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, password, null);
     }
 
     /**
@@ -124,6 +138,7 @@ public class LedgerMetadata {
             ArrayList<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>(entry.getValue());
             this.addEnsemble(startEntryId, newEnsemble);
         }
+        this.customMetadata = other.customMetadata;
     }
 
     private LedgerMetadata() {
@@ -255,6 +270,14 @@ public class LedgerMetadata {
         }
     }
 
+    public Map<String, byte[]> getCustomMetadata() {
+        return this.customMetadata;
+    }
+
+    void setCustomMetadata(Map<String, byte[]> customMetadata) {
+        this.customMetadata = customMetadata;
+    }
+
     /**
      * Generates a byte array of this object
      *
@@ -273,6 +296,14 @@ public class LedgerMetadata {
             builder.setDigestType(digestType).setPassword(ByteString.copyFrom(password));
         }
 
+        if (customMetadata != null) {
+            LedgerMetadataFormat.cMetadataMapEntry.Builder cMetadataBuilder = LedgerMetadataFormat.cMetadataMapEntry.newBuilder();
+            for (String key : customMetadata.keySet()) {
+                cMetadataBuilder.setKey(key).setValue(ByteString.copyFrom(customMetadata.get(key)));
+                builder.addCustomMetadata(cMetadataBuilder.build());
+            }
+        }
+
         for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : ensembles.entrySet()) {
             LedgerMetadataFormat.Segment.Builder segmentBuilder = LedgerMetadataFormat.Segment.newBuilder();
             segmentBuilder.setFirstEntryId(entry.getKey());
@@ -395,6 +426,14 @@ public class LedgerMetadata {
             }
             lc.addEnsemble(s.getFirstEntryId(), addrs);
         }
+
+        if (data.getCustomMetadataCount() > 0) {
+            List<LedgerMetadataFormat.cMetadataMapEntry> cMetadataList = data.getCustomMetadataList();
+            lc.customMetadata = Maps.newHashMap();
+            for (LedgerMetadataFormat.cMetadataMapEntry ent : cMetadataList) {
+                lc.customMetadata.put(ent.getKey(), ent.getValue().toByteArray());
+            }
+        }
         return lc;
     }
 
@@ -467,6 +506,37 @@ public class LedgerMetadata {
     }
 
     /**
+     * Routine to compare two Map<String, byte[]>; Since the values in the map are byte[], we can't use Map.equals
+     * @param first
+     *          The first map
+     * @param second
+     *          The second map to compare with
+     * @return true if the 2 maps contain the exact set of <K,V> pairs.
+     */
+    public static boolean areByteArrayValMapsEqual(Map<String, byte[]> first, Map<String, byte[]> second) {
+        if(first == null && second == null) {
+            return true;
+        }
+
+        // above check confirms that both are not null;
+        // if one is null the other isn't; so they must
+        // be different
+        if (first == null || second == null) {
+            return false;
+        }
+
+        if (first.size() != second.size()) {
+            return false;
+        }
+        for (Map.Entry<String, byte[]> entry : first.entrySet()) {
+            if (!Arrays.equals(entry.getValue(), second.get(entry.getKey()))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
      * Is the metadata conflict with new updated metadata.
      *
      * @param newMeta
@@ -487,7 +557,8 @@ public class LedgerMetadata {
             length != newMeta.length ||
             state != newMeta.state ||
             !digestType.equals(newMeta.digestType) ||
-            !Arrays.equals(password, newMeta.password)) {
+            !Arrays.equals(password, newMeta.password) ||
+            !LedgerMetadata.areByteArrayValMapsEqual(customMetadata, newMeta.customMetadata)) {
             return true;
         }
         if (state == LedgerMetadataFormat.State.CLOSED

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6d71b829/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
index 4552144..98f4764 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
@@ -56,6 +56,16 @@ public final class DataFormats {
     // optional int64 ctime = 10;
     boolean hasCtime();
     long getCtime();
+    
+    // repeated .LedgerMetadataFormat.cMetadataMapEntry customMetadata = 11;
+    java.util.List<org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry> 
+        getCustomMetadataList();
+    org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry getCustomMetadata(int index);
+    int getCustomMetadataCount();
+    java.util.List<? extends org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder> 
+        getCustomMetadataOrBuilderList();
+    org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder getCustomMetadataOrBuilder(
+        int index);
   }
   public static final class LedgerMetadataFormat extends
       com.google.protobuf.GeneratedMessage
@@ -683,6 +693,441 @@ public final class DataFormats {
       // @@protoc_insertion_point(class_scope:LedgerMetadataFormat.Segment)
     }
     
+    public interface cMetadataMapEntryOrBuilder
+        extends com.google.protobuf.MessageOrBuilder {
+      
+      // optional string key = 1;
+      boolean hasKey();
+      String getKey();
+      
+      // optional bytes value = 2;
+      boolean hasValue();
+      com.google.protobuf.ByteString getValue();
+    }
+    public static final class cMetadataMapEntry extends
+        com.google.protobuf.GeneratedMessage
+        implements cMetadataMapEntryOrBuilder {
+      // Use cMetadataMapEntry.newBuilder() to construct.
+      private cMetadataMapEntry(Builder builder) {
+        super(builder);
+      }
+      private cMetadataMapEntry(boolean noInit) {}
+      
+      private static final cMetadataMapEntry defaultInstance;
+      public static cMetadataMapEntry getDefaultInstance() {
+        return defaultInstance;
+      }
+      
+      public cMetadataMapEntry getDefaultInstanceForType() {
+        return defaultInstance;
+      }
+      
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.bookkeeper.proto.DataFormats.internal_static_LedgerMetadataFormat_cMetadataMapEntry_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.bookkeeper.proto.DataFormats.internal_static_LedgerMetadataFormat_cMetadataMapEntry_fieldAccessorTable;
+      }
+      
+      private int bitField0_;
+      // optional string key = 1;
+      public static final int KEY_FIELD_NUMBER = 1;
+      private java.lang.Object key_;
+      public boolean hasKey() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getKey() {
+        java.lang.Object ref = key_;
+        if (ref instanceof String) {
+          return (String) ref;
+        } else {
+          com.google.protobuf.ByteString bs = 
+              (com.google.protobuf.ByteString) ref;
+          String s = bs.toStringUtf8();
+          if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+            key_ = s;
+          }
+          return s;
+        }
+      }
+      private com.google.protobuf.ByteString getKeyBytes() {
+        java.lang.Object ref = key_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+          key_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      
+      // optional bytes value = 2;
+      public static final int VALUE_FIELD_NUMBER = 2;
+      private com.google.protobuf.ByteString value_;
+      public boolean hasValue() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public com.google.protobuf.ByteString getValue() {
+        return value_;
+      }
+      
+      private void initFields() {
+        key_ = "";
+        value_ = com.google.protobuf.ByteString.EMPTY;
+      }
+      private byte memoizedIsInitialized = -1;
+      public final boolean isInitialized() {
+        byte isInitialized = memoizedIsInitialized;
+        if (isInitialized != -1) return isInitialized == 1;
+        
+        memoizedIsInitialized = 1;
+        return true;
+      }
+      
+      public void writeTo(com.google.protobuf.CodedOutputStream output)
+                          throws java.io.IOException {
+        getSerializedSize();
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          output.writeBytes(1, getKeyBytes());
+        }
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          output.writeBytes(2, value_);
+        }
+        getUnknownFields().writeTo(output);
+      }
+      
+      private int memoizedSerializedSize = -1;
+      public int getSerializedSize() {
+        int size = memoizedSerializedSize;
+        if (size != -1) return size;
+      
+        size = 0;
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeBytesSize(1, getKeyBytes());
+        }
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeBytesSize(2, value_);
+        }
+        size += getUnknownFields().getSerializedSize();
+        memoizedSerializedSize = size;
+        return size;
+      }
+      
+      private static final long serialVersionUID = 0L;
+      @java.lang.Override
+      protected java.lang.Object writeReplace()
+          throws java.io.ObjectStreamException {
+        return super.writeReplace();
+      }
+      
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseFrom(
+          com.google.protobuf.ByteString data)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data).buildParsed();
+      }
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseFrom(
+          com.google.protobuf.ByteString data,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data, extensionRegistry)
+                 .buildParsed();
+      }
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseFrom(byte[] data)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data).buildParsed();
+      }
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseFrom(
+          byte[] data,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data, extensionRegistry)
+                 .buildParsed();
+      }
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseFrom(java.io.InputStream input)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input).buildParsed();
+      }
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseFrom(
+          java.io.InputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input, extensionRegistry)
+                 .buildParsed();
+      }
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseDelimitedFrom(java.io.InputStream input)
+          throws java.io.IOException {
+        Builder builder = newBuilder();
+        if (builder.mergeDelimitedFrom(input)) {
+          return builder.buildParsed();
+        } else {
+          return null;
+        }
+      }
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseDelimitedFrom(
+          java.io.InputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        Builder builder = newBuilder();
+        if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+          return builder.buildParsed();
+        } else {
+          return null;
+        }
+      }
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseFrom(
+          com.google.protobuf.CodedInputStream input)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input).buildParsed();
+      }
+      public static org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry parseFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input, extensionRegistry)
+                 .buildParsed();
+      }
+      
+      public static Builder newBuilder() { return Builder.create(); }
+      public Builder newBuilderForType() { return newBuilder(); }
+      public static Builder newBuilder(org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry prototype) {
+        return newBuilder().mergeFrom(prototype);
+      }
+      public Builder toBuilder() { return newBuilder(this); }
+      
+      @java.lang.Override
+      protected Builder newBuilderForType(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        Builder builder = new Builder(parent);
+        return builder;
+      }
+      public static final class Builder extends
+          com.google.protobuf.GeneratedMessage.Builder<Builder>
+         implements org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder {
+        public static final com.google.protobuf.Descriptors.Descriptor
+            getDescriptor() {
+          return org.apache.bookkeeper.proto.DataFormats.internal_static_LedgerMetadataFormat_cMetadataMapEntry_descriptor;
+        }
+        
+        protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+            internalGetFieldAccessorTable() {
+          return org.apache.bookkeeper.proto.DataFormats.internal_static_LedgerMetadataFormat_cMetadataMapEntry_fieldAccessorTable;
+        }
+        
+        // Construct using org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.newBuilder()
+        private Builder() {
+          maybeForceBuilderInitialization();
+        }
+        
+        private Builder(BuilderParent parent) {
+          super(parent);
+          maybeForceBuilderInitialization();
+        }
+        private void maybeForceBuilderInitialization() {
+          if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+          }
+        }
+        private static Builder create() {
+          return new Builder();
+        }
+        
+        public Builder clear() {
+          super.clear();
+          key_ = "";
+          bitField0_ = (bitField0_ & ~0x00000001);
+          value_ = com.google.protobuf.ByteString.EMPTY;
+          bitField0_ = (bitField0_ & ~0x00000002);
+          return this;
+        }
+        
+        public Builder clone() {
+          return create().mergeFrom(buildPartial());
+        }
+        
+        public com.google.protobuf.Descriptors.Descriptor
+            getDescriptorForType() {
+          return org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.getDescriptor();
+        }
+        
+        public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry getDefaultInstanceForType() {
+          return org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.getDefaultInstance();
+        }
+        
+        public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry build() {
+          org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry result = buildPartial();
+          if (!result.isInitialized()) {
+            throw newUninitializedMessageException(result);
+          }
+          return result;
+        }
+        
+        private org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry buildParsed()
+            throws com.google.protobuf.InvalidProtocolBufferException {
+          org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry result = buildPartial();
+          if (!result.isInitialized()) {
+            throw newUninitializedMessageException(
+              result).asInvalidProtocolBufferException();
+          }
+          return result;
+        }
+        
+        public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry buildPartial() {
+          org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry result = new org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry(this);
+          int from_bitField0_ = bitField0_;
+          int to_bitField0_ = 0;
+          if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+            to_bitField0_ |= 0x00000001;
+          }
+          result.key_ = key_;
+          if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+            to_bitField0_ |= 0x00000002;
+          }
+          result.value_ = value_;
+          result.bitField0_ = to_bitField0_;
+          onBuilt();
+          return result;
+        }
+        
+        public Builder mergeFrom(com.google.protobuf.Message other) {
+          if (other instanceof org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry) {
+            return mergeFrom((org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry)other);
+          } else {
+            super.mergeFrom(other);
+            return this;
+          }
+        }
+        
+        public Builder mergeFrom(org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry other) {
+          if (other == org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.getDefaultInstance()) return this;
+          if (other.hasKey()) {
+            setKey(other.getKey());
+          }
+          if (other.hasValue()) {
+            setValue(other.getValue());
+          }
+          this.mergeUnknownFields(other.getUnknownFields());
+          return this;
+        }
+        
+        public final boolean isInitialized() {
+          return true;
+        }
+        
+        public Builder mergeFrom(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+          com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+            com.google.protobuf.UnknownFieldSet.newBuilder(
+              this.getUnknownFields());
+          while (true) {
+            int tag = input.readTag();
+            switch (tag) {
+              case 0:
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              default: {
+                if (!parseUnknownField(input, unknownFields,
+                                       extensionRegistry, tag)) {
+                  this.setUnknownFields(unknownFields.build());
+                  onChanged();
+                  return this;
+                }
+                break;
+              }
+              case 10: {
+                bitField0_ |= 0x00000001;
+                key_ = input.readBytes();
+                break;
+              }
+              case 18: {
+                bitField0_ |= 0x00000002;
+                value_ = input.readBytes();
+                break;
+              }
+            }
+          }
+        }
+        
+        private int bitField0_;
+        
+        // optional string key = 1;
+        private java.lang.Object key_ = "";
+        public boolean hasKey() {
+          return ((bitField0_ & 0x00000001) == 0x00000001);
+        }
+        public String getKey() {
+          java.lang.Object ref = key_;
+          if (!(ref instanceof String)) {
+            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+            key_ = s;
+            return s;
+          } else {
+            return (String) ref;
+          }
+        }
+        public Builder setKey(String value) {
+          if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+          key_ = value;
+          onChanged();
+          return this;
+        }
+        public Builder clearKey() {
+          bitField0_ = (bitField0_ & ~0x00000001);
+          key_ = getDefaultInstance().getKey();
+          onChanged();
+          return this;
+        }
+        void setKey(com.google.protobuf.ByteString value) {
+          bitField0_ |= 0x00000001;
+          key_ = value;
+          onChanged();
+        }
+        
+        // optional bytes value = 2;
+        private com.google.protobuf.ByteString value_ = com.google.protobuf.ByteString.EMPTY;
+        public boolean hasValue() {
+          return ((bitField0_ & 0x00000002) == 0x00000002);
+        }
+        public com.google.protobuf.ByteString getValue() {
+          return value_;
+        }
+        public Builder setValue(com.google.protobuf.ByteString value) {
+          if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+          value_ = value;
+          onChanged();
+          return this;
+        }
+        public Builder clearValue() {
+          bitField0_ = (bitField0_ & ~0x00000002);
+          value_ = getDefaultInstance().getValue();
+          onChanged();
+          return this;
+        }
+        
+        // @@protoc_insertion_point(builder_scope:LedgerMetadataFormat.cMetadataMapEntry)
+      }
+      
+      static {
+        defaultInstance = new cMetadataMapEntry(true);
+        defaultInstance.initFields();
+      }
+      
+      // @@protoc_insertion_point(class_scope:LedgerMetadataFormat.cMetadataMapEntry)
+    }
+    
     private int bitField0_;
     // required int32 quorumSize = 1;
     public static final int QUORUMSIZE_FIELD_NUMBER = 1;
@@ -795,6 +1240,27 @@ public final class DataFormats {
       return ctime_;
     }
     
+    // repeated .LedgerMetadataFormat.cMetadataMapEntry customMetadata = 11;
+    public static final int CUSTOMMETADATA_FIELD_NUMBER = 11;
+    private java.util.List<org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry> customMetadata_;
+    public java.util.List<org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry> getCustomMetadataList() {
+      return customMetadata_;
+    }
+    public java.util.List<? extends org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder> 
+        getCustomMetadataOrBuilderList() {
+      return customMetadata_;
+    }
+    public int getCustomMetadataCount() {
+      return customMetadata_.size();
+    }
+    public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry getCustomMetadata(int index) {
+      return customMetadata_.get(index);
+    }
+    public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder getCustomMetadataOrBuilder(
+        int index) {
+      return customMetadata_.get(index);
+    }
+    
     private void initFields() {
       quorumSize_ = 0;
       ensembleSize_ = 0;
@@ -806,6 +1272,7 @@ public final class DataFormats {
       password_ = com.google.protobuf.ByteString.EMPTY;
       ackQuorumSize_ = 0;
       ctime_ = 0L;
+      customMetadata_ = java.util.Collections.emptyList();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -871,6 +1338,9 @@ public final class DataFormats {
       if (((bitField0_ & 0x00000100) == 0x00000100)) {
         output.writeInt64(10, ctime_);
       }
+      for (int i = 0; i < customMetadata_.size(); i++) {
+        output.writeMessage(11, customMetadata_.get(i));
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -920,6 +1390,10 @@ public final class DataFormats {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(10, ctime_);
       }
+      for (int i = 0; i < customMetadata_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(11, customMetadata_.get(i));
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1037,6 +1511,7 @@ public final class DataFormats {
       private void maybeForceBuilderInitialization() {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
           getSegmentFieldBuilder();
+          getCustomMetadataFieldBuilder();
         }
       }
       private static Builder create() {
@@ -1069,6 +1544,12 @@ public final class DataFormats {
         bitField0_ = (bitField0_ & ~0x00000100);
         ctime_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000200);
+        if (customMetadataBuilder_ == null) {
+          customMetadata_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000400);
+        } else {
+          customMetadataBuilder_.clear();
+        }
         return this;
       }
       
@@ -1152,6 +1633,15 @@ public final class DataFormats {
           to_bitField0_ |= 0x00000100;
         }
         result.ctime_ = ctime_;
+        if (customMetadataBuilder_ == null) {
+          if (((bitField0_ & 0x00000400) == 0x00000400)) {
+            customMetadata_ = java.util.Collections.unmodifiableList(customMetadata_);
+            bitField0_ = (bitField0_ & ~0x00000400);
+          }
+          result.customMetadata_ = customMetadata_;
+        } else {
+          result.customMetadata_ = customMetadataBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1221,6 +1711,32 @@ public final class DataFormats {
         if (other.hasCtime()) {
           setCtime(other.getCtime());
         }
+        if (customMetadataBuilder_ == null) {
+          if (!other.customMetadata_.isEmpty()) {
+            if (customMetadata_.isEmpty()) {
+              customMetadata_ = other.customMetadata_;
+              bitField0_ = (bitField0_ & ~0x00000400);
+            } else {
+              ensureCustomMetadataIsMutable();
+              customMetadata_.addAll(other.customMetadata_);
+            }
+            onChanged();
+          }
+        } else {
+          if (!other.customMetadata_.isEmpty()) {
+            if (customMetadataBuilder_.isEmpty()) {
+              customMetadataBuilder_.dispose();
+              customMetadataBuilder_ = null;
+              customMetadata_ = other.customMetadata_;
+              bitField0_ = (bitField0_ & ~0x00000400);
+              customMetadataBuilder_ = 
+                com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+                   getCustomMetadataFieldBuilder() : null;
+            } else {
+              customMetadataBuilder_.addAllMessages(other.customMetadata_);
+            }
+          }
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1337,6 +1853,12 @@ public final class DataFormats {
               ctime_ = input.readInt64();
               break;
             }
+            case 90: {
+              org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder subBuilder = org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addCustomMetadata(subBuilder.buildPartial());
+              break;
+            }
           }
         }
       }
@@ -1727,6 +2249,192 @@ public final class DataFormats {
         return this;
       }
       
+      // repeated .LedgerMetadataFormat.cMetadataMapEntry customMetadata = 11;
+      private java.util.List<org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry> customMetadata_ =
+        java.util.Collections.emptyList();
+      private void ensureCustomMetadataIsMutable() {
+        if (!((bitField0_ & 0x00000400) == 0x00000400)) {
+          customMetadata_ = new java.util.ArrayList<org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry>(customMetadata_);
+          bitField0_ |= 0x00000400;
+         }
+      }
+      
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder> customMetadataBuilder_;
+      
+      public java.util.List<org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry> getCustomMetadataList() {
+        if (customMetadataBuilder_ == null) {
+          return java.util.Collections.unmodifiableList(customMetadata_);
+        } else {
+          return customMetadataBuilder_.getMessageList();
+        }
+      }
+      public int getCustomMetadataCount() {
+        if (customMetadataBuilder_ == null) {
+          return customMetadata_.size();
+        } else {
+          return customMetadataBuilder_.getCount();
+        }
+      }
+      public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry getCustomMetadata(int index) {
+        if (customMetadataBuilder_ == null) {
+          return customMetadata_.get(index);
+        } else {
+          return customMetadataBuilder_.getMessage(index);
+        }
+      }
+      public Builder setCustomMetadata(
+          int index, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry value) {
+        if (customMetadataBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureCustomMetadataIsMutable();
+          customMetadata_.set(index, value);
+          onChanged();
+        } else {
+          customMetadataBuilder_.setMessage(index, value);
+        }
+        return this;
+      }
+      public Builder setCustomMetadata(
+          int index, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder builderForValue) {
+        if (customMetadataBuilder_ == null) {
+          ensureCustomMetadataIsMutable();
+          customMetadata_.set(index, builderForValue.build());
+          onChanged();
+        } else {
+          customMetadataBuilder_.setMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      public Builder addCustomMetadata(org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry value) {
+        if (customMetadataBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureCustomMetadataIsMutable();
+          customMetadata_.add(value);
+          onChanged();
+        } else {
+          customMetadataBuilder_.addMessage(value);
+        }
+        return this;
+      }
+      public Builder addCustomMetadata(
+          int index, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry value) {
+        if (customMetadataBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureCustomMetadataIsMutable();
+          customMetadata_.add(index, value);
+          onChanged();
+        } else {
+          customMetadataBuilder_.addMessage(index, value);
+        }
+        return this;
+      }
+      public Builder addCustomMetadata(
+          org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder builderForValue) {
+        if (customMetadataBuilder_ == null) {
+          ensureCustomMetadataIsMutable();
+          customMetadata_.add(builderForValue.build());
+          onChanged();
+        } else {
+          customMetadataBuilder_.addMessage(builderForValue.build());
+        }
+        return this;
+      }
+      public Builder addCustomMetadata(
+          int index, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder builderForValue) {
+        if (customMetadataBuilder_ == null) {
+          ensureCustomMetadataIsMutable();
+          customMetadata_.add(index, builderForValue.build());
+          onChanged();
+        } else {
+          customMetadataBuilder_.addMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      public Builder addAllCustomMetadata(
+          java.lang.Iterable<? extends org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry> values) {
+        if (customMetadataBuilder_ == null) {
+          ensureCustomMetadataIsMutable();
+          super.addAll(values, customMetadata_);
+          onChanged();
+        } else {
+          customMetadataBuilder_.addAllMessages(values);
+        }
+        return this;
+      }
+      public Builder clearCustomMetadata() {
+        if (customMetadataBuilder_ == null) {
+          customMetadata_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000400);
+          onChanged();
+        } else {
+          customMetadataBuilder_.clear();
+        }
+        return this;
+      }
+      public Builder removeCustomMetadata(int index) {
+        if (customMetadataBuilder_ == null) {
+          ensureCustomMetadataIsMutable();
+          customMetadata_.remove(index);
+          onChanged();
+        } else {
+          customMetadataBuilder_.remove(index);
+        }
+        return this;
+      }
+      public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder getCustomMetadataBuilder(
+          int index) {
+        return getCustomMetadataFieldBuilder().getBuilder(index);
+      }
+      public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder getCustomMetadataOrBuilder(
+          int index) {
+        if (customMetadataBuilder_ == null) {
+          return customMetadata_.get(index);  } else {
+          return customMetadataBuilder_.getMessageOrBuilder(index);
+        }
+      }
+      public java.util.List<? extends org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder> 
+           getCustomMetadataOrBuilderList() {
+        if (customMetadataBuilder_ != null) {
+          return customMetadataBuilder_.getMessageOrBuilderList();
+        } else {
+          return java.util.Collections.unmodifiableList(customMetadata_);
+        }
+      }
+      public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder addCustomMetadataBuilder() {
+        return getCustomMetadataFieldBuilder().addBuilder(
+            org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.getDefaultInstance());
+      }
+      public org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder addCustomMetadataBuilder(
+          int index) {
+        return getCustomMetadataFieldBuilder().addBuilder(
+            index, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.getDefaultInstance());
+      }
+      public java.util.List<org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder> 
+           getCustomMetadataBuilderList() {
+        return getCustomMetadataFieldBuilder().getBuilderList();
+      }
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder> 
+          getCustomMetadataFieldBuilder() {
+        if (customMetadataBuilder_ == null) {
+          customMetadataBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+              org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder, org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntryOrBuilder>(
+                  customMetadata_,
+                  ((bitField0_ & 0x00000400) == 0x00000400),
+                  getParentForChildren(),
+                  isClean());
+          customMetadata_ = null;
+        }
+        return customMetadataBuilder_;
+      }
+      
       // @@protoc_insertion_point(builder_scope:LedgerMetadataFormat)
     }
     
@@ -4017,6 +4725,11 @@ public final class DataFormats {
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_LedgerMetadataFormat_Segment_fieldAccessorTable;
   private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_LedgerMetadataFormat_cMetadataMapEntry_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_LedgerMetadataFormat_cMetadataMapEntry_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
     internal_static_LedgerRereplicationLayoutFormat_descriptor;
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -4050,7 +4763,7 @@ public final class DataFormats {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n src/main/proto/DataFormats.proto\"\301\003\n\024L" +
+      "\n src/main/proto/DataFormats.proto\"\263\004\n\024L" +
       "edgerMetadataFormat\022\022\n\nquorumSize\030\001 \002(\005\022" +
       "\024\n\014ensembleSize\030\002 \002(\005\022\016\n\006length\030\003 \002(\003\022\023\n" +
       "\013lastEntryId\030\004 \001(\003\0220\n\005state\030\005 \002(\0162\033.Ledg" +
@@ -4058,19 +4771,21 @@ public final class DataFormats {
       "\006 \003(\0132\035.LedgerMetadataFormat.Segment\0224\n\n" +
       "digestType\030\007 \001(\0162 .LedgerMetadataFormat." +
       "DigestType\022\020\n\010password\030\010 \001(\014\022\025\n\rackQuoru" +
-      "mSize\030\t \001(\005\022\r\n\005ctime\030\n \001(\003\0327\n\007Segment\022\026\n" +
-      "\016ensembleMember\030\001 \003(\t\022\024\n\014firstEntryId\030\002 ",
-      "\002(\003\".\n\005State\022\010\n\004OPEN\020\001\022\017\n\013IN_RECOVERY\020\002\022" +
-      "\n\n\006CLOSED\020\003\"!\n\nDigestType\022\t\n\005CRC32\020\001\022\010\n\004" +
-      "HMAC\020\002\"@\n\037LedgerRereplicationLayoutForma" +
-      "t\022\014\n\004type\030\001 \002(\t\022\017\n\007version\030\002 \002(\005\".\n\033Unde" +
-      "rreplicatedLedgerFormat\022\017\n\007replica\030\001 \003(\t" +
-      "\"^\n\014CookieFormat\022\022\n\nbookieHost\030\001 \002(\t\022\022\n\n" +
-      "journalDir\030\002 \002(\t\022\022\n\nledgerDirs\030\003 \002(\t\022\022\n\n" +
-      "instanceId\030\004 \001(\t\"\"\n\016LockDataFormat\022\020\n\010bo" +
-      "okieId\030\001 \001(\t\"%\n\021AuditorVoteFormat\022\020\n\010boo" +
-      "kieId\030\001 \001(\tB\037\n\033org.apache.bookkeeper.pro",
-      "toH\001"
+      "mSize\030\t \001(\005\022\r\n\005ctime\030\n \001(\003\022?\n\016customMeta" +
+      "data\030\013 \003(\0132\'.LedgerMetadataFormat.cMetad",
+      "ataMapEntry\0327\n\007Segment\022\026\n\016ensembleMember" +
+      "\030\001 \003(\t\022\024\n\014firstEntryId\030\002 \002(\003\032/\n\021cMetadat" +
+      "aMapEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\014\".\n" +
+      "\005State\022\010\n\004OPEN\020\001\022\017\n\013IN_RECOVERY\020\002\022\n\n\006CLO" +
+      "SED\020\003\"!\n\nDigestType\022\t\n\005CRC32\020\001\022\010\n\004HMAC\020\002" +
+      "\"@\n\037LedgerRereplicationLayoutFormat\022\014\n\004t" +
+      "ype\030\001 \002(\t\022\017\n\007version\030\002 \002(\005\".\n\033Underrepli" +
+      "catedLedgerFormat\022\017\n\007replica\030\001 \003(\t\"^\n\014Co" +
+      "okieFormat\022\022\n\nbookieHost\030\001 \002(\t\022\022\n\njourna" +
+      "lDir\030\002 \002(\t\022\022\n\nledgerDirs\030\003 \002(\t\022\022\n\ninstan",
+      "ceId\030\004 \001(\t\"\"\n\016LockDataFormat\022\020\n\010bookieId" +
+      "\030\001 \001(\t\"%\n\021AuditorVoteFormat\022\020\n\010bookieId\030" +
+      "\001 \001(\tB\037\n\033org.apache.bookkeeper.protoH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4082,7 +4797,7 @@ public final class DataFormats {
           internal_static_LedgerMetadataFormat_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_LedgerMetadataFormat_descriptor,
-              new java.lang.String[] { "QuorumSize", "EnsembleSize", "Length", "LastEntryId", "State", "Segment", "DigestType", "Password", "AckQuorumSize", "Ctime", },
+              new java.lang.String[] { "QuorumSize", "EnsembleSize", "Length", "LastEntryId", "State", "Segment", "DigestType", "Password", "AckQuorumSize", "Ctime", "CustomMetadata", },
               org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.class,
               org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.Builder.class);
           internal_static_LedgerMetadataFormat_Segment_descriptor =
@@ -4093,6 +4808,14 @@ public final class DataFormats {
               new java.lang.String[] { "EnsembleMember", "FirstEntryId", },
               org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.Segment.class,
               org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.Segment.Builder.class);
+          internal_static_LedgerMetadataFormat_cMetadataMapEntry_descriptor =
+            internal_static_LedgerMetadataFormat_descriptor.getNestedTypes().get(1);
+          internal_static_LedgerMetadataFormat_cMetadataMapEntry_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_LedgerMetadataFormat_cMetadataMapEntry_descriptor,
+              new java.lang.String[] { "Key", "Value", },
+              org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.class,
+              org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.cMetadataMapEntry.Builder.class);
           internal_static_LedgerRereplicationLayoutFormat_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_LedgerRereplicationLayoutFormat_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6d71b829/bookkeeper-server/src/main/proto/DataFormats.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/proto/DataFormats.proto b/bookkeeper-server/src/main/proto/DataFormats.proto
index 6d97c3a..b4ce86d 100644
--- a/bookkeeper-server/src/main/proto/DataFormats.proto
+++ b/bookkeeper-server/src/main/proto/DataFormats.proto
@@ -50,6 +50,12 @@ message LedgerMetadataFormat {
     optional int32 ackQuorumSize = 9;
     
     optional int64 ctime = 10;
+
+    message cMetadataMapEntry {
+        optional string key = 1;
+        optional bytes value = 2;
+    }
+    repeated cMetadataMapEntry customMetadata = 11;
 }
 
 message LedgerRereplicationLayoutFormat {
@@ -83,4 +89,4 @@ message LockDataFormat {
  */
 message AuditorVoteFormat {
     optional string bookieId = 1;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6d71b829/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 692c480..a5fbe24 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -24,6 +24,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Random;
+import java.util.Map;
+import java.util.UUID;
+import java.util.HashMap;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -36,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.*;
 
+
 /**
  * Testing ledger write entry cases
  */
@@ -173,6 +177,52 @@ public class BookieWriteLedgerTest extends
     }
 
     /**
+     * Verify the functionality of Ledger create which accepts customMetadata as input.
+     * Also verifies that the data written is read back properly.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testLedgerCreateWithCustomMetadata() throws Exception {
+        // Create a ledger
+        long ledgerId;
+        int maxLedgers = 10;
+        for (int i = 0; i < maxLedgers; i++) {
+            Map<String, byte[]> inputCustomMetadataMap = new HashMap<String, byte[]>();
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            // each ledger has different number of key, value pairs.
+            for (int j = 0; j < i; j++) {
+                inputCustomMetadataMap.put("key" + j, UUID.randomUUID().toString().getBytes());
+            }
+
+            if (i < maxLedgers/2) {
+                // 0 to 4 test with createLedger interface
+                lh = bkc.createLedger(5, 3, 2, digestType, ledgerPassword, inputCustomMetadataMap);
+                ledgerId = lh.getId();
+                lh.addEntry(entry.array());
+            } else {
+                // 5 to 9 test with createLedgerAdv interface
+                lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword, inputCustomMetadataMap);
+                ledgerId = lh.getId();
+                lh.addEntry(0, entry.array());
+            }
+            lh.close();
+
+            // now reopen the ledger; this should fetch all the metadata stored on zk
+            // and the customMetadata written and read should match
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            Map<String, byte[]> outputCustomMetadataMap = lh.getCustomMetadata();
+            assertTrue("Can't retrieve proper Custom Data",
+                       LedgerMetadata.areByteArrayValMapsEqual(inputCustomMetadataMap, outputCustomMetadataMap));
+            lh.close();
+            bkc.deleteLedger(ledgerId);
+        }
+    }
+
+    /**
      * Verify asynchronous writing when few bookie failures in last ensemble.
      */
     @Test(timeout=60000)


Mime
View raw message