bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [19/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
Date Wed, 16 Mar 2016 03:44:29 GMT
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MapUtils.java
----------------------------------------------------------------------
diff --git a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MapUtils.java b/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MapUtils.java
deleted file mode 100644
index 898e8b1..0000000
--- a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MapUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.protoextensions;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MapUtils {
-
-    static final Logger logger = LoggerFactory.getLogger(MapUtils.class);
-
-    public static String toString(PubSubProtocol.Map map) {
-        StringBuilder sb = new StringBuilder();
-        int numEntries = map.getEntriesCount();
-        for (int i=0; i<numEntries; i++) {
-            PubSubProtocol.Map.Entry entry = map.getEntries(i);
-            String key = entry.getKey();
-            ByteString value = entry.getValue();
-            sb.append(key).append('=').append(value.toStringUtf8());
-            if (i != (numEntries - 1)) {
-                sb.append(',');
-            }
-        }
-        return sb.toString();
-    }
-
-    public static Map<String, ByteString> buildMap(PubSubProtocol.Map protoMap) {
-        Map<String, ByteString> javaMap = new HashMap<String, ByteString>();
-
-        int numEntries = protoMap.getEntriesCount();
-        for (int i=0; i<numEntries; i++) {
-            PubSubProtocol.Map.Entry entry = protoMap.getEntries(i);
-            String key = entry.getKey();
-            if (javaMap.containsKey(key)) {
-                ByteString preValue = javaMap.get(key);
-                logger.warn("Key " + key + " has already been defined as value : " + preValue.toStringUtf8());
-            } else {
-                javaMap.put(key, entry.getValue());
-            }
-        }
-        return javaMap;
-    }
-
-    public static PubSubProtocol.Map.Builder buildMapBuilder(Map<String, ByteString> javaMap) {
-        PubSubProtocol.Map.Builder mapBuilder = PubSubProtocol.Map.newBuilder();
-
-        for (Map.Entry<String, ByteString> entry : javaMap.entrySet()) {
-            mapBuilder.addEntries(PubSubProtocol.Map.Entry.newBuilder().setKey(entry.getKey())
-                                                .setValue(entry.getValue()));
-        }
-        return mapBuilder;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MessageIdUtils.java
----------------------------------------------------------------------
diff --git a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MessageIdUtils.java b/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MessageIdUtils.java
deleted file mode 100644
index 9ceec26..0000000
--- a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MessageIdUtils.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.protoextensions;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.exceptions.PubSubException.UnexpectedConditionException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
-
-public class MessageIdUtils {
-
-    public static String msgIdToReadableString(MessageSeqId seqId) {
-        StringBuilder sb = new StringBuilder();
-        sb.append("local:");
-        sb.append(seqId.getLocalComponent());
-
-        String separator = ";";
-        for (RegionSpecificSeqId regionId : seqId.getRemoteComponentsList()) {
-            sb.append(separator);
-            sb.append(regionId.getRegion().toStringUtf8());
-            sb.append(':');
-            sb.append(regionId.getSeqId());
-        }
-        return sb.toString();
-    }
-
-    public static Map<ByteString, RegionSpecificSeqId> inMapForm(MessageSeqId msi) {
-        Map<ByteString, RegionSpecificSeqId> map = new HashMap<ByteString, RegionSpecificSeqId>();
-
-        for (RegionSpecificSeqId lmsid : msi.getRemoteComponentsList()) {
-            map.put(lmsid.getRegion(), lmsid);
-        }
-
-        return map;
-    }
-
-    public static boolean areEqual(MessageSeqId m1, MessageSeqId m2) {
-
-        if (m1.getLocalComponent() != m2.getLocalComponent()) {
-            return false;
-        }
-
-        if (m1.getRemoteComponentsCount() != m2.getRemoteComponentsCount()) {
-            return false;
-        }
-
-        Map<ByteString, RegionSpecificSeqId> m2map = inMapForm(m2);
-
-        for (RegionSpecificSeqId lmsid1 : m1.getRemoteComponentsList()) {
-            RegionSpecificSeqId lmsid2 = m2map.get(lmsid1.getRegion());
-            if (lmsid2 == null) {
-                return false;
-            }
-            if (lmsid1.getSeqId() != lmsid2.getSeqId()) {
-                return false;
-            }
-        }
-
-        return true;
-
-    }
-
-    public static Message mergeLocalSeqId(Message.Builder messageBuilder, long localSeqId) {
-        MessageSeqId.Builder msidBuilder = MessageSeqId.newBuilder(messageBuilder.getMsgId());
-        msidBuilder.setLocalComponent(localSeqId);
-        messageBuilder.setMsgId(msidBuilder);
-        return messageBuilder.build();
-    }
-
-    public static Message mergeLocalSeqId(Message orginalMessage, long localSeqId) {
-        return mergeLocalSeqId(Message.newBuilder(orginalMessage), localSeqId);
-    }
-
-    /**
-     * Compares two seq numbers represented as lists of longs.
-     *
-     * @param l1
-     * @param l2
-     * @return 1 if the l1 is greater, 0 if they are equal, -1 if l2 is greater
-     * @throws UnexpectedConditionException
-     *             If the lists are of unequal length
-     */
-    public static int compare(List<Long> l1, List<Long> l2) throws UnexpectedConditionException {
-        if (l1.size() != l2.size()) {
-            throw new UnexpectedConditionException("Seq-ids being compared have different sizes: " + l1.size()
-                                                   + " and " + l2.size());
-        }
-
-        for (int i = 0; i < l1.size(); i++) {
-            long v1 = l1.get(i);
-            long v2 = l2.get(i);
-
-            if (v1 == v2) {
-                continue;
-            }
-
-            return v1 > v2 ? 1 : -1;
-        }
-
-        // All components equal
-        return 0;
-    }
-
-    /**
-     * Returns the element-wise vector maximum of the two vectors id1 and id2,
-     * if we imagine them to be sparse representations of vectors.
-     */
-    public static void takeRegionMaximum(MessageSeqId.Builder newIdBuilder, MessageSeqId id1, MessageSeqId id2) {
-        Map<ByteString, RegionSpecificSeqId> id2Map = MessageIdUtils.inMapForm(id2);
-
-        for (RegionSpecificSeqId rrsid1 : id1.getRemoteComponentsList()) {
-            ByteString region = rrsid1.getRegion();
-
-            RegionSpecificSeqId rssid2 = id2Map.get(region);
-
-            if (rssid2 == null) {
-                newIdBuilder.addRemoteComponents(rrsid1);
-                continue;
-            }
-
-            newIdBuilder.addRemoteComponents((rrsid1.getSeqId() > rssid2.getSeqId()) ? rrsid1 : rssid2);
-
-            // remove from map
-            id2Map.remove(region);
-        }
-
-        // now take the remaining components in the map and add them
-        for (RegionSpecificSeqId rssid2 : id2Map.values()) {
-            newIdBuilder.addRemoteComponents(rssid2);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
----------------------------------------------------------------------
diff --git a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java b/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
deleted file mode 100644
index 5a9cdf7..0000000
--- a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.protoextensions;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEventResponse;
-
-public class PubSubResponseUtils {
-
-    /**
-     * Change here if bumping up the version number that the server sends back
-     */
-    public final static ProtocolVersion serverVersion = ProtocolVersion.VERSION_ONE;
-
-    static PubSubResponse.Builder getBasicBuilder(StatusCode status) {
-        return PubSubResponse.newBuilder().setProtocolVersion(serverVersion).setStatusCode(status);
-    }
-
-    public static PubSubResponse getSuccessResponse(long txnId) {
-        return getBasicBuilder(StatusCode.SUCCESS).setTxnId(txnId).build();
-    }
-
-    public static PubSubResponse getSuccessResponse(long txnId, ResponseBody respBody) {
-        return getBasicBuilder(StatusCode.SUCCESS).setTxnId(txnId)
-               .setResponseBody(respBody).build();
-    }
-
-    public static PubSubResponse getResponseForException(PubSubException e, long txnId) {
-        return getBasicBuilder(e.getCode()).setStatusMsg(e.getMessage()).setTxnId(txnId).build();
-    }
-
-    public static PubSubResponse getResponseForSubscriptionEvent(ByteString topic,
-                                                                 ByteString subscriberId,
-                                                                 SubscriptionEvent event) {
-        SubscriptionEventResponse.Builder eventBuilder =
-            SubscriptionEventResponse.newBuilder().setEvent(event);
-        ResponseBody.Builder respBuilder =
-            ResponseBody.newBuilder().setSubscriptionEvent(eventBuilder);
-        PubSubResponse response = PubSubResponse.newBuilder()
-                                  .setProtocolVersion(ProtocolVersion.VERSION_ONE)
-                                  .setStatusCode(StatusCode.SUCCESS).setTxnId(0)
-                                  .setTopic(topic).setSubscriberId(subscriberId)
-                                  .setResponseBody(respBuilder).build();
-        return response;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java
----------------------------------------------------------------------
diff --git a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java b/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java
deleted file mode 100644
index e195ace..0000000
--- a/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.protoextensions;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SubscriptionStateUtils {
-
-    static final Logger logger = LoggerFactory.getLogger(SubscriptionStateUtils.class);
-
-    // For now, to differentiate hub subscribers from local ones, the
-    // subscriberId will be prepended with a hard-coded prefix. Local
-    // subscribers will validate that the subscriberId used cannot start with
-    // this prefix. This is only used internally by the hub subscribers.
-    public static final String HUB_SUBSCRIBER_PREFIX = "__";
-
-    public static SubscriptionData parseSubscriptionData(byte[] data)
-    throws InvalidProtocolBufferException {
-        try {
-            return SubscriptionData.parseFrom(data);
-        } catch (InvalidProtocolBufferException ex) {
-            logger.info("Failed to parse data as SubscriptionData. Fall backward to parse it as SubscriptionState for backward compatability.");
-            // backward compability
-            SubscriptionState state = SubscriptionState.parseFrom(data);
-            return SubscriptionData.newBuilder().setState(state).build();
-        }
-    }
-
-    public static String toString(SubscriptionData data) {
-        StringBuilder sb = new StringBuilder();
-        if (data.hasState()) {
-            sb.append("State : { ").append(toString(data.getState())).append(" };");
-        }
-        if (data.hasPreferences()) {
-            sb.append("Preferences : { ").append(toString(data.getPreferences())).append(" };");
-        }
-        return sb.toString();
-    }
-
-    public static String toString(SubscriptionState state) {
-        StringBuilder sb = new StringBuilder();
-        sb.append("consumeSeqId: " + MessageIdUtils.msgIdToReadableString(state.getMsgId()));
-        return sb.toString();
-    }
-
-    public static String toString(SubscriptionPreferences preferences) {
-        StringBuilder sb = new StringBuilder();
-        sb.append("System Preferences : [");
-        if (preferences.hasMessageBound()) {
-            sb.append("(messageBound=").append(preferences.getMessageBound())
-              .append(")");
-        }
-        sb.append("]");
-        if (preferences.hasOptions()) {
-            sb.append(", Customized Preferences : [");
-            sb.append(MapUtils.toString(preferences.getOptions()));
-            sb.append("]");
-        }
-        return sb.toString();
-    }
-
-    public static boolean isHubSubscriber(ByteString subscriberId) {
-        return subscriberId.toStringUtf8().startsWith(HUB_SUBSCRIBER_PREFIX);
-    }
-
-    public static Map<String, ByteString> buildUserOptions(SubscriptionPreferences preferences) {
-        if (preferences.hasOptions()) {
-            return MapUtils.buildMap(preferences.getOptions());
-        } else {
-            return new HashMap<String, ByteString>();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
----------------------------------------------------------------------
diff --git a/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto b/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
deleted file mode 100644
index c31f0a6..0000000
--- a/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.hedwig.protocol";
-option optimize_for = SPEED;
-package Hedwig; 
-
-enum ProtocolVersion{
-    VERSION_ONE = 1;
-}
-
-// common structure to store header or properties
-message Map {
-    message Entry {
-        optional string key  = 1;
-        optional bytes value = 2;
-    }
-    repeated Entry entries = 1;
-}
-
-// message header
-message MessageHeader {
-    // user customized fields used for message filter
-    optional Map properties = 1;
-    // following are system properties in message header
-    optional string messageType = 2;
-}
-
-/*
- * this is the structure that will be serialized
- */
-message Message {
-    required bytes body = 1;
-    optional bytes srcRegion = 2;
-    optional MessageSeqId msgId = 3;
-    // message header
-    optional MessageHeader header = 4;
-}
-
-message RegionSpecificSeqId {
-    required bytes region = 1;
-    required uint64 seqId = 2;
-}
-    
-message MessageSeqId{
-    optional uint64 localComponent = 1;
-    repeated RegionSpecificSeqId remoteComponents = 2;    
-}
-
-enum OperationType{
-    PUBLISH = 0;
-    SUBSCRIBE = 1;
-    CONSUME = 2;
-    UNSUBSCRIBE = 3;
-    
-    //the following two are only used for the hedwig proxy
-    START_DELIVERY = 4;
-    STOP_DELIVERY = 5;
-    // end for requests only used for hedwig proxy
-
-    CLOSESUBSCRIPTION = 6;
-}
-
-/* A PubSubRequest is just a union of the various request types, with
- * an enum telling us which type it is. The same can also be done through 
- * extensions. We need one request type that we will deserialize into on 
- * the server side.
- */
-message PubSubRequest{
-    
-    required ProtocolVersion protocolVersion = 1;
-    required OperationType type = 2;
-    repeated bytes triedServers = 3;
-    required uint64 txnId = 4;
-    optional bool shouldClaim = 5;
-    required bytes topic = 6;
-    //any authentication stuff and other general stuff here
-    
-    
-    /* one entry for each type of request */
-    optional PublishRequest publishRequest = 52; 
-    optional SubscribeRequest subscribeRequest = 53;
-    optional ConsumeRequest consumeRequest = 54;
-    optional UnsubscribeRequest unsubscribeRequest = 55;
-    optional StopDeliveryRequest stopDeliveryRequest = 56;
-    optional StartDeliveryRequest startDeliveryRequest = 57;
-    optional CloseSubscriptionRequest closeSubscriptionRequest = 58;
-}
-
-
-
-message PublishRequest{
-    required Message msg = 2;
-}
-
-// record all preferences for a subscription,
-// would be serialized to be stored in meta store
-message SubscriptionPreferences {
-    // user customized subscription options
-    optional Map options = 1;
-
-    ///
-    /// system defined options
-    ///
-
-    // message bound
-    optional uint32 messageBound = 2;
-    // server-side message filter
-    optional string messageFilter = 3;
-    // message window size, this is the maximum number of messages 
-    // which will be delivered without being consumed
-    optional uint32 messageWindowSize = 4;
-}
-
-message SubscribeRequest{
-    required bytes subscriberId = 2;
-
-    enum CreateOrAttach{
-        CREATE = 0;
-        ATTACH = 1;
-        CREATE_OR_ATTACH = 2;
-    };
-    optional CreateOrAttach createOrAttach = 3 [default = CREATE_OR_ATTACH];
-
-    // wait for cross-regional subscriptions to be established before returning
-    optional bool synchronous = 4 [default = false];
-    // @Deprecated. set message bound in SubscriptionPreferences
-    optional uint32 messageBound = 5;
-
-    // subscription options
-    optional SubscriptionPreferences preferences = 6;
-
-    // force attach subscription which would kill existed channel
-    // this option doesn't need to be persisted
-    optional bool forceAttach = 7 [default = false];
-}
-
-// used in client only
-// options are stored in SubscriptionPreferences structure
-message SubscriptionOptions {
-    // force attach subscription which would kill existed channel
-    // this option doesn't need to be persisted
-    optional bool forceAttach = 1 [default = false];
-    optional SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = CREATE_OR_ATTACH];
-    optional uint32 messageBound = 3 [default = 0];
-    // user customized subscription options
-    optional Map options = 4;
-    // server-side message filter
-    optional string messageFilter = 5;
-    // message window size, this is the maximum number of messages 
-    // which will be delivered without being consumed
-    optional uint32 messageWindowSize = 6;
-    // enable resubscribe
-    optional bool enableResubscribe = 7 [default = true];
-}
-
-message ConsumeRequest{
-    required bytes subscriberId = 2;    
-    required MessageSeqId msgId = 3;
-    //the msgId is cumulative: all messages up to this id are marked as consumed
-}
-
-message UnsubscribeRequest{
-    required bytes subscriberId = 2;
-}
-
-message CloseSubscriptionRequest {
-    required bytes subscriberId = 2;
-}
-
-message StopDeliveryRequest{
-    required bytes subscriberId = 2;
-}
-
-message StartDeliveryRequest{
-    required bytes subscriberId = 2;
-}
-
-// Identify an event happened for a subscription
-enum SubscriptionEvent {
-    // topic has changed ownership (hub server down or topic released)
-    TOPIC_MOVED = 1;
-    // subscription is force closed by other subscribers
-    SUBSCRIPTION_FORCED_CLOSED = 2;
-}
-
-// a response carries an event for a subscription sent to client
-message SubscriptionEventResponse {
-    optional SubscriptionEvent event = 1;
-}
-
-message PubSubResponse{
-    required ProtocolVersion protocolVersion = 1;
-    required StatusCode statusCode = 2;
-    required uint64 txnId = 3;
-
-    optional string statusMsg = 4;
-    //in case of a status code of NOT_RESPONSIBLE_FOR_TOPIC, the status
-    //message will contain the name of the host actually responsible 
-    //for the topic
-    
-    //the following fields are sent in delivered messages
-    optional Message message = 5;
-    optional bytes topic = 6;
-    optional bytes subscriberId = 7;
-
-    // the following fields are sent by other requests
-    optional ResponseBody responseBody = 8;
-}
-
-message PublishResponse {
-    // If the request was a publish request, this was the message Id of the published message.
-    required MessageSeqId publishedMsgId = 1;
-}
-
-message SubscribeResponse {
-    optional SubscriptionPreferences preferences = 2;
-}
-
-message ResponseBody {
-    optional PublishResponse publishResponse = 1;
-    optional SubscribeResponse subscribeResponse = 2;
-    optional SubscriptionEventResponse subscriptionEvent = 3;
-}
-
-
-enum StatusCode{
-    SUCCESS = 0;
-    
-    //client-side errors (4xx)
-    MALFORMED_REQUEST = 401;
-    NO_SUCH_TOPIC = 402;
-    CLIENT_ALREADY_SUBSCRIBED = 403;
-    CLIENT_NOT_SUBSCRIBED = 404;
-    COULD_NOT_CONNECT = 405;
-    TOPIC_BUSY = 406;
-    RESUBSCRIBE_EXCEPTION = 407;
-    
-    //server-side errors (5xx)
-    NOT_RESPONSIBLE_FOR_TOPIC = 501;
-    SERVICE_DOWN = 502;
-    UNCERTAIN_STATE = 503;
-    INVALID_MESSAGE_FILTER = 504;
-
-    //server-side meta manager errors (52x)
-    BAD_VERSION = 520;
-    NO_TOPIC_PERSISTENCE_INFO = 521;
-    TOPIC_PERSISTENCE_INFO_EXISTS = 522;
-    NO_SUBSCRIPTION_STATE = 523;
-    SUBSCRIPTION_STATE_EXISTS = 524;
-    NO_TOPIC_OWNER_INFO = 525;
-    TOPIC_OWNER_INFO_EXISTS = 526;
-
-    //For all unexpected error conditions
-    UNEXPECTED_CONDITION = 600;
-    
-    COMPOSITE = 700;
-}
-  
-//What follows is not the server client protocol, but server-internal structures that are serialized in ZK  
-//They should eventually be moved into the server 
-    
-message SubscriptionState {
-    required MessageSeqId msgId = 1;
-    // @Deprecated.
-    // It is a bad idea to put fields that don't change frequently
-    // together with fields that change frequently
-    // so move it to subscription preferences structure
-    optional uint32 messageBound = 2;
-}
-
-message SubscriptionData {
-    optional SubscriptionState state = 1;
-    optional SubscriptionPreferences preferences = 2;
-}
-
-message LedgerRange{
-    required uint64 ledgerId = 1;
-    optional MessageSeqId endSeqIdIncluded = 2;
-    optional uint64 startSeqIdIncluded = 3;
-}
-
-message LedgerRanges{
-    repeated LedgerRange ranges = 1;
-}
-
-message ManagerMeta {
-    required string managerImpl = 2;
-    required uint32 managerVersion = 3;
-}
-
-message HubInfoData {
-    required string hostname = 2;
-    required uint64 czxid = 3;
-}
-
-message HubLoadData {
-    required uint64 numTopics = 2;
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/hedwig-protocol/src/main/resources/findbugsExclude.xml b/hedwig-protocol/src/main/resources/findbugsExclude.xml
deleted file mode 100644
index 27cd339..0000000
--- a/hedwig-protocol/src/main/resources/findbugsExclude.xml
+++ /dev/null
@@ -1,23 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-//-->
-<FindBugsFilter>
-  <Match>
-    <!-- generated code, we can't be held responsible for findbugs in it //-->
-    <Class name="~org\.apache\.hedwig\.protocol\.PubSubProtocol.*" />
-  </Match>
-</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/bin/hedwig
----------------------------------------------------------------------
diff --git a/hedwig-server/bin/hedwig b/hedwig-server/bin/hedwig
deleted file mode 100755
index a2ff83b..0000000
--- a/hedwig-server/bin/hedwig
+++ /dev/null
@@ -1,205 +0,0 @@
-#!/usr/bin/env bash
-#
-#/**
-# * Copyright 2007 The Apache Software Foundation
-# *
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-# check if net.ipv6.bindv6only is set to 1
-bindv6only=$(/sbin/sysctl -n net.ipv6.bindv6only 2> /dev/null)
-if [ -n "$bindv6only" ] && [ "$bindv6only" -eq "1" ]
-then
-  echo "Error: \"net.ipv6.bindv6only\" is set to 1 - Java networking could be broken"
-  echo "For more info (the following page also applies to hedwig): http://wiki.apache.org/hadoop/HadoopIPv6"
-  exit 1
-fi
-
-# See the following page for extensive details on setting
-# up the JVM to accept JMX remote management:
-# http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
-# by default we allow local JMX connections
-if [ "x$JMXLOCALONLY" = "x" ]
-then
-    JMXLOCALONLY=false
-fi
-
-if [ "x$JMXDISABLE" = "x" ]
-then
-    echo "JMX enabled by default" >&2
-    # for some reason these two options are necessary on jdk6 on Ubuntu
-    #   accord to the docs they are not necessary, but otw jconsole cannot
-    #   do a local attach
-    JMX_ARGS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY"
-else
-    echo "JMX disabled by user request" >&2
-fi
-
-BINDIR=`dirname "$0"`
-HW_HOME=`cd $BINDIR/..;pwd`
-
-DEFAULT_CONF=$HW_HOME/conf/hw_server.conf
-DEFAULT_REGION_CLIENT_CONF=$HW_HOME/conf/hw_region_client.conf
-DEFAULT_LOG_CONF=$HW_HOME/conf/log4j.properties
-
-. $HW_HOME/conf/hwenv.sh
-
-# Check for the java to use
-if [[ -z $JAVA_HOME ]]; then
-    JAVA=$(which java)
-    if [ $? = 0 ]; then
-        echo "JAVA_HOME not set, using java from PATH. ($JAVA)"
-    else
-        echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
-        exit 1
-    fi
-else
-    JAVA=$JAVA_HOME/bin/java
-fi
-
-RELEASE_JAR=`ls $HW_HOME/hedwig-server-*.jar 2> /dev/null | grep -v tests | tail -1`
-
-if [ $? == 0 ]; then
-    HEDWIG_JAR=$RELEASE_JAR
-fi
-
-BUILT_JAR=`ls $HW_HOME/target/hedwig-server-*.jar 2> /dev/null | grep -v tests | tail -1`
-if [ $? != 0 ] && [ ! -e "$HEDWIG_JAR" ]; then 
-    echo "\nCouldn't find hedwig jar.";
-    echo "Make sure you've run 'mvn package'\n";
-    exit 1;
-elif [ -e "$BUILT_JAR" ]; then
-    HEDWIG_JAR=$BUILT_JAR
-fi
-
-add_maven_deps_to_classpath() {
-    MVN="mvn"
-    if [ "$MAVEN_HOME" != "" ]; then
-	MVN=${MAVEN_HOME}/bin/mvn
-    fi
-    
-    # Need to generate classpath from maven pom. This is costly so generate it
-    # and cache it. Save the file into our target dir so a mvn clean will get
-    # clean it up and force us create a new one.
-    f="${HW_HOME}/target/cached_classpath.txt"
-    if [ ! -f "${f}" ]
-    then
-	${MVN} -f "${HW_HOME}/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}" &> /dev/null
-    fi
-    HEDWIG_CLASSPATH=${CLASSPATH}:`cat "${f}"`
-}
-
-if [ -d "$HW_HOME/lib" ]; then
-    for i in $HW_HOME/lib/*.jar; do
-	HEDWIG_CLASSPATH=$HEDWIG_CLASSPATH:$i
-    done
-else
-    add_maven_deps_to_classpath
-fi
-
-hedwig_help() {
-    cat <<EOF
-Usage: hedwig <command>
-where command is one of:
-    server           Run the hedwig server
-    console          Run the hedwig admin console
-    help             This help message
-
-or command is the full name of a class with a defined main() method.
-
-Environment variables:
-   HEDWIG_SERVER_CONF           Hedwig server configuration file (default $DEFAULT_CONF)
-   HEDWIG_REGION_CLIENT_CONF           Configuration file for the hedwig client used by the
-                                region manager (default $DEFAULT_REGION_CLIENT_CONF)
-   HEDWIG_CONSOLE_SERVER_CONF   Server part configuration for hedwig console,
-                                used for metadata management (defaults to HEDWIG_SERVER_CONF)
-   HEDWIG_CONSOLE_CLIENT_CONF   Client part configuration for hedwig console,
-                                used for interacting with hub server.
-   HEDWIG_LOG_CONF              Log4j configuration file (default $DEFAULT_LOG_CONF)
-   HEDWIG_ROOT_LOGGER           Root logger for hedwig
-   HEDWIG_LOG_DIR               Log directory to store log files for hedwig server
-   HEDWIG_LOG_FILE              Log file name
-   HEDWIG_EXTRA_OPTS            Extra options to be passed to the jvm
-
-These variable can also be set in conf/hwenv.sh
-EOF
-}
-
-# if no args specified, show usage
-if [ $# = 0 ]; then
-    hedwig_help;
-    exit 1;
-fi
-
-# get arguments
-COMMAND=$1
-shift
-
-if [ -z "$HEDWIG_SERVER_CONF" ]; then
-    HEDWIG_SERVER_CONF=$DEFAULT_CONF;
-fi
-
-if [ -z "$HEDWIG_REGION_CLIENT_CONF" ]; then
-    HEDWIG_REGION_CLIENT_CONF=$DEFAULT_REGION_CLIENT_CONF;
-fi
-
-if [ -z "$HEDWIG_LOG_CONF" ]; then
-    HEDWIG_LOG_CONF=$DEFAULT_LOG_CONF
-fi
-
-HEDWIG_CLASSPATH="$HEDWIG_JAR:$HEDWIG_CLASSPATH"
-
-if [ "$HEDWIG_LOG_CONF" != "" ]; then
-    HEDWIG_CLASSPATH="`dirname $HEDWIG_LOG_CONF`:$HEDWIG_CLASSPATH"
-    OPTS="$OPTS -Dlog4j.configuration=`basename $HEDWIG_LOG_CONF`"
-fi
-OPTS="-cp $HEDWIG_CLASSPATH $OPTS $HEDWIG_EXTRA_OPTS"
-
-# Disable ipv6 as it can cause issues
-OPTS="$OPTS -Djava.net.preferIPv4Stack=true"
-
-# log directory & file
-HEDWIG_ROOT_LOGGER=${HEDWIG_ROOT_LOGGER:-"INFO,CONSOLE"}
-HEDWIG_LOG_DIR=${HEDWIG_LOG_DIR:-"$HW_HOME/logs"}
-HEDWIG_LOG_FILE=${HEDWIG_LOG_FILE:-"hedwig-server.log"}
-
-# Configure log configuration system properties
-OPTS="$OPTS -Dhedwig.root.logger=$HEDWIG_ROOT_LOGGER"
-OPTS="$OPTS -Dhedwig.log.dir=$HEDWIG_LOG_DIR"
-OPTS="$OPTS -Dhedwig.log.file=$HEDWIG_LOG_FILE"
-
-# Change to HW_HOME to support relative paths
-cd "$BK_HOME"
-if [ $COMMAND == "server" ]; then
-    exec $JAVA $OPTS $JMX_ARGS org.apache.hedwig.server.netty.PubSubServer $HEDWIG_SERVER_CONF $HEDWIG_REGION_CLIENT_CONF $@
-elif [ $COMMAND == "console" ]; then
-    # hedwig console configuration server part
-    if [ -z "$HEDWIG_CONSOLE_SERVER_CONF" ]; then
-        HEDWIG_CONSOLE_SERVER_CONF=$HEDWIG_SERVER_CONF
-    fi
-    # hedwig console configuration client part
-    if [ -n "$HEDWIG_CONSOLE_CLIENT_CONF" ]; then
-        HEDWIG_CONSOLE_CLIENT_OPTIONS="-client-cfg $HEDWIG_CONSOLE_CLIENT_CONF"
-    fi
-    exec $JAVA $OPTS org.apache.hedwig.admin.console.HedwigConsole -server-cfg $HEDWIG_CONSOLE_SERVER_CONF $HEDWIG_CONSOLE_CLIENT_OPTIONS $@
-elif [ $COMMAND == "help" ]; then
-    hedwig_help;
-else
-    exec $JAVA $OPTS $COMMAND $@
-fi
-
-

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/bin/hedwig-daemon.sh
----------------------------------------------------------------------
diff --git a/hedwig-server/bin/hedwig-daemon.sh b/hedwig-server/bin/hedwig-daemon.sh
deleted file mode 100755
index 73eac6f..0000000
--- a/hedwig-server/bin/hedwig-daemon.sh
+++ /dev/null
@@ -1,163 +0,0 @@
-#!/usr/bin/env bash
-#
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-usage() {
-    cat <<EOF
-Usage: hedwig-daemon.sh (start|stop) <command> <args...>
-where command is one of:
-    server           Run the hedwig server
-EOF
-}
-
-
-BINDIR=`dirname "$0"`
-HEDWIG_HOME=`cd $BINDIR/..;pwd`
-
-if [ -f $HEDWIG_HOME/conf/hwenv.sh ]
-then
- . $HEDWIG_HOME/conf/hwenv.sh
-fi
-
-HEDWIG_LOG_DIR=${HEDWIG_LOG_DIR:-"$HEDWIG_HOME/logs"}
-
-HEDWIG_ROOT_LOGGER=${HEDWIG_ROOT_LOGGER:-'INFO,ROLLINGFILE'}
-
-HEDWIG_STOP_TIMEOUT=${HEDWIG_STOP_TIMEOUT:-30}
-
-HEDWIG_PID_DIR=${HEDWIG_PID_DIR:-$HEDWIG_HOME/bin}
-
-if [ $# -lt 2 ]
-then
-    echo "Error: no enough arguments provided."
-    usage
-    exit 1
-fi
-
-startStop=$1
-shift
-command=$1
-shift
-
-case $command in
-    (server)
-        echo "doing $startStop $command ..."
-        ;;
-    (*)
-        echo "Error: unknown service name $command"
-        usage
-        exit 1
-        ;;
-esac
-
-export HEDWIG_LOG_DIR=$HEDWIG_LOG_DIR
-export HEDWIG_ROOT_LOGGER=$HEDWIG_ROOT_LOGGER
-export HEDWIG_LOG_FILE=hedwig-$command-$HOSTNAME.log
-
-pid=$HEDWIG_PID_DIR/hedwig-$command.pid
-out=$HEDWIG_LOG_DIR/hedwig-$command-$HOSTNAME.out
-logfile=$HEDWIG_LOG_DIR/$HEDWIG_LOG_FILE
-
-rotate_out_log ()
-{
-    log=$1;
-    num=5;
-    if [ -n "$2" ]; then
-       num=$2
-    fi
-    if [ -f "$log" ]; then # rotate logs
-        while [ $num -gt 1 ]; do
-            prev=`expr $num - 1`
-            [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
-            num=$prev
-        done
-        mv "$log" "$log.$num";
-    fi
-}
-
-mkdir -p "$HEDWIG_LOG_DIR"
-
-case $startStop in
-  (start)
-    if [ -f $pid ]; then
-      if kill -0 `cat $pid` > /dev/null 2>&1; then
-        echo $command running as process `cat $pid`.  Stop it first.
-        exit 1
-      fi
-    fi
-
-    rotate_out_log $out
-    echo starting $command, logging to $logfile
-    hedwig=$HEDWIG_HOME/bin/hedwig
-    nohup $hedwig $command "$@" > "$out" 2>&1 < /dev/null &
-    echo $! > $pid
-    sleep 1; head $out
-    sleep 2;
-    if ! ps -p $! > /dev/null ; then
-      exit 1
-    fi
-    ;;
-
-  (stop)
-    if [ -f $pid ]; then
-      TARGET_PID=`cat $pid`
-      if kill -0 $TARGET_PID > /dev/null 2>&1; then
-        echo stopping $command
-        kill $TARGET_PID
-
-        count=0
-        location=$HEDWIG_LOG_DIR
-        while ps -p $TARGET_PID > /dev/null;
-         do
-          echo "Shutdown is in progress... Please wait..."
-          sleep 1
-          count=`expr $count + 1`
-         
-          if [ "$count" = "$HEDWIG_STOP_TIMEOUT" ]; then
-                break
-          fi
-         done
-        
-        if [ "$count" != "$HEDWIG_STOP_TIMEOUT" ]; then
-                 echo "Shutdown completed."
-                exit 0
-        fi
-                 
-        if kill -0 $TARGET_PID > /dev/null 2>&1; then
-              fileName=$location/$command.out
-              $JAVA_HOME/bin/jstack $TARGET_PID > $fileName
-              echo Thread dumps are taken for analysis at $fileName
-              echo forcefully stopping $command
-              kill -9 $TARGET_PID >/dev/null 2>&1
-              echo Successfully stopped the process
-        fi
-      else
-        echo no $command to stop
-      fi
-      rm $pid
-    else
-      echo no $command to stop
-    fi
-    ;;
-
-  (*)
-    usage
-    exit 1
-    ;;
-esac

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/conf/hw_region_client.conf
----------------------------------------------------------------------
diff --git a/hedwig-server/conf/hw_region_client.conf b/hedwig-server/conf/hw_region_client.conf
deleted file mode 100644
index 9a5592e..0000000
--- a/hedwig-server/conf/hw_region_client.conf
+++ /dev/null
@@ -1,42 +0,0 @@
-#   Licensed to the Apache Software Foundation (ASF) under one or more
-#   contributor license agreements.  See the NOTICE file distributed with
-#   this work for additional information regarding copyright ownership.
-#   The ASF licenses this file to You under the Apache License, Version 2.0
-#   (the "License"); you may not use this file except in compliance with
-#   the License.  You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-
-# This is the configuration file for the hedwig client used by the region manager
-
-# This parameter is a boolean flag indicating if communication with the
-# server should be done via SSL for encryption. The Hedwig server hubs also
-# need to be SSL enabled for this to work.
-# ssl_enabled=false
-
-# The maximum message size in bytes
-# max_message_size=2097152
-
-# The maximum number of redirects we permit before signalling an error
-# max_server_redirects=2
-
-# A flag indicating whether the client library should automatically send
-# consume messages to the server
-# auto_send_consume_message_enabled=true
-
-# The number of messages we buffer before sending a consume message
-# to the server
-# consumed_messages_buffer_size=5
-
-# Support for client side throttling.
-# max_outstanding_messages=10
-
-# The timeout in milliseconds before we error out any existing
-# requests
-# server_ack_response_timeout=30000

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/conf/hw_server.conf
----------------------------------------------------------------------
diff --git a/hedwig-server/conf/hw_server.conf b/hedwig-server/conf/hw_server.conf
deleted file mode 100644
index 2ca2d54..0000000
--- a/hedwig-server/conf/hw_server.conf
+++ /dev/null
@@ -1,168 +0,0 @@
-#   Licensed to the Apache Software Foundation (ASF) under one or more
-#   contributor license agreements.  See the NOTICE file distributed with
-#   this work for additional information regarding copyright ownership.
-#   The ASF licenses this file to You under the Apache License, Version 2.0
-#   (the "License"); you may not use this file except in compliance with
-#   the License.  You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-
-################################
-# ZooKeeper Settings
-################################
-
-# The ZooKeeper server host(s) for the Hedwig Server to use.
-zk_host=localhost:2181
-
-# The number of milliseconds of each tick in ZooKeeper.
-zk_timeout=2000
-
-################################
-# Hub Server Settings
-################################
-
-# Is the hub server running in standalone mode?
-# Default is false.
-standalone=false
-
-# The port at which the clients will connect.
-server_port=4080
-
-# The SSL port at which the clients will connect (only if SSL is enabled).
-ssl_server_port=9876
-
-# Flag indicating if the server should also operate in SSL mode.
-ssl_enabled=false
-
-# Name of the SSL certificate if available as a resource.
-# The certificate should be in pkcs12 format.
-# cert_name=
-
-# Path to the SSL certificate if available as a file.
-# The certificate should be in pkcs12 format.
-# cert_path=
-
-# Password used for pkcs12 certificate.
-# password=
-
-#######################################
-# Publish and subscription parameters
-#######################################
-# Max Message Size that a hub server could accept
-# max_message_size=1258291 
-
-# Message Sequence Interval to update subscription state to metadata store.
-# Default is 50.
-# consume_interval=50
-
-# Time interval (in seconds) to release topic ownership. If the time interval
-# is less than zero, the ownership will never be released automatically.
-# Default is 0.
-# retention_secs=0
-
-# Time interval (in milliseconds) to run messages consumed timer task to
-# delete those consumed ledgers in BookKeeper.
-# messages_consumed_thread_run_interval=60000
-
-# Default maximum number of messages which can be delivered to a subscriber
-# without being consumed. We pause messages delivery to a subscriber when
-# reaching the window size. Default is 0, which means we never pause messages
-# delivery even a subscriber consumes nothing and it doesn't set any subscriber
-# specified message window size.
-# default_message_window_size=0
-
-# The maximum number of entries stored in a ledger. When the number of entries
-# reaches this threshold, hub server will open a new ledger to write. Default is 0.
-# If it was set to 0, hub server will keep using same ledger to write entries unless
-# the topic ownership changed.
-# max_entries_per_ledger=0
-
-################################
-# Region Related Settings
-################################
-
-# Region name that the hub server belongs to.
-# region=standalone
-
-# Regions list of a Hedwig instance.
-# The expected format for the regions parameter is Hostname:Port:SSLPort
-# with spaces in between each of regions.
-# regions=
-
-# Enabled ssl connections between regions or not.
-# (@Deprecated here. It is recommended to set in conf/hw_region_client.conf)
-# Default is false.
-# inter_region_ssl_enabled=false
-
-# Time interval (in milliseconds) to run thread to retry those failed
-# remote subscriptions in asynchronous mode. Default is 120000.
-# retry_remote_subscribe_thread_run_interval=120000
-
-################################
-# ReadAhead Settings
-################################
-
-# Enable read ahead cache or not. If disabled, read requests
-# would access BookKeeper directly.
-# Default is true.
-# readahead_enabled=true
-
-# Number of entries to read ahead. Default value is 10.
-# readahead_count=10
-
-# Max size of entries to read ahead. Default value is 4M.
-# readahead_size=4194304
-
-# Max memory used for ReadAhead Cache.
-# Default value is minimum value of 2G or half of JVM max memory.
-# cache_size=
-
-# The backoff time (in milliseconds) to retry scans after failures.
-# Default value is 1000.
-# scan_backoff_ms=1000
-
-# Sets the number of threads to be used for the read-ahead mechanism.
-# Default is the number of cores as returned with a call to 
-# <code>Runtime.getRuntime().availableProcessors()</code>.
-# num_readahead_cache_threads=
-
-# Set TTL for cache entries. Each time adding new entry into the cache,
-# those expired cache entries would be discarded. If the value is set
-# to zero or less than zero, cache entry will not be evicted until the
-# cache is fullfilled or the messages are already consumed. By default
-# the value is zero.
-# cache_entry_ttl=
-
-################################
-# Metadata Settings
-################################
-
-# zookeeper prefix to store metadata if using zookeeper as metadata store.
-# Default value is "/hedwig".
-# zk_prefix=/hedwig
-
-# Enable metadata manager based topic manager. Default is false.
-# metadata_manager_based_topic_manager_enabled=false
-
-# Class name of metadata manager factory used to store metadata.
-# Default is null.
-# metadata_manager_factory_class=
-
-################################
-# BookKeeper Settings
-################################
-
-# Ensemble size of a ledger in BookKeeper. Default is 3.
-# bk_ensemble_size=3
-
-# Write quorum size for a ledger in BookKeeper. Default is 2.
-# bk_write_quorum_size=2
-
-# Ack quorum size for a ledger in BookKeeper. Default is 2.
-# bk_ack_quorum_size=2

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/conf/hwenv.sh
----------------------------------------------------------------------
diff --git a/hedwig-server/conf/hwenv.sh b/hedwig-server/conf/hwenv.sh
deleted file mode 100644
index 8d379b6..0000000
--- a/hedwig-server/conf/hwenv.sh
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/bin/sh
-#
-#/**
-# * Copyright 2007 The Apache Software Foundation
-# *
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-# Set JAVA_HOME here to override the environment setting
-# JAVA_HOME=
-
-# default settings for starting hedwig
-# HEDWIG_SERVER_CONF=
-
-# default settings for the region manager's hedwig client
-# HEDWIG_REGION_CLIENT_CONF=
-
-# default settings for the region manager's hedwig client
-# HEDWIG_CLIENT_CONF=
-
-# Server part configuration for hedwig console,
-# used for metadata management
-# HEDWIG_CONSOLE_SERVER_CONF=
-
-# Client part configuration for hedwig console,
-# used for interacting with hub server.
-# HEDWIG_CONSOLE_CLIENT_CONF=
-
-# Log4j configuration file
-# HEDWIG_LOG_CONF=
-
-# Logs location
-# HEDWIG_LOG_DIR=
-
-# Extra options to be passed to the jvm
-# HEDWIG_EXTRA_OPTS=
-
-#Folder where the hedwig server PID file should be stored
-#HEDWIG_PID_DIR=
-
-#Wait time before forcefully kill the hedwig server instance, if the stop is not successful
-#HEDWIG_STOP_TIMEOUT=

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/hedwig-server/conf/log4j.properties b/hedwig-server/conf/log4j.properties
deleted file mode 100644
index c0f1c49..0000000
--- a/hedwig-server/conf/log4j.properties
+++ /dev/null
@@ -1,78 +0,0 @@
-#
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#
-
-#
-# Hedwig Logging Configuration
-#
-
-# Format is "<default threshold> (, <appender>)+
-
-# DEFAULT: console appender only
-# Define some default values that can be overridden by system properties
-hedwig.root.logger=WARN,CONSOLE
-hedwig.log.dir=.
-hedwig.log.file=hedwig-server.log
-hedwig.trace.file=hedwig-trace.log
-
-log4j.rootLogger=${hedwig.root.logger}
-
-# Example with rolling log file
-#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
-
-# Example with rolling log file and tracing
-#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
-
-#
-# Log INFO level and above messages to the console
-#
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=INFO
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-#
-# Add ROLLINGFILE to rootLogger to get log file output
-#    Log DEBUG level and above messages to a log file
-log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.ROLLINGFILE.Threshold=INFO
-log4j.appender.ROLLINGFILE.File=${hedwig.log.dir}/${hedwig.log.file}
-log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-# Max log file size of 10MB
-#log4j.appender.ROLLINGFILE.MaxFileSize=10MB
-# uncomment the next line to limit number of backup files
-#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
-
-log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
-
-
-#
-# Add TRACEFILE to rootLogger to get log file output
-#    Log DEBUG level and above messages to a log file
-log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
-log4j.appender.TRACEFILE.Threshold=TRACE
-log4j.appender.TRACEFILE.File=${hedwig.log.dir}/${hedwig.trace.file}
-
-log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
-### Notice we are including log4j's NDC here (%x)
-log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/pom.xml
----------------------------------------------------------------------
diff --git a/hedwig-server/pom.xml b/hedwig-server/pom.xml
deleted file mode 100644
index b460f25..0000000
--- a/hedwig-server/pom.xml
+++ /dev/null
@@ -1,294 +0,0 @@
-<?xml version="1.0"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.bookkeeper</groupId>
-    <artifactId>bookkeeper</artifactId>
-    <version>4.4.0-SNAPSHOT</version>
-  </parent>
-  <properties>
-      <mainclass>org.apache.hedwig.server.netty.PubSubServer</mainclass>
-      <project.libdir>${basedir}/lib</project.libdir>
-  </properties>
-  <artifactId>hedwig-server</artifactId>
-  <packaging>jar</packaging>
-  <name>hedwig-server</name>
-  <url>http://maven.apache.org</url>
-  <dependencies>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>4.8.1</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>1.6.4</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>1.6.4</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>hedwig-client</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>compile</scope>
-      <type>jar</type>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.derby</groupId>
-        <artifactId>derby</artifactId>
-        <version>10.8.2.2</version>
-        <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <version>${zookeeper.version}</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <version>${zookeeper.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>bookkeeper-server</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>compile</scope>
-      <type>jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>bookkeeper-server</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    <!--
-        Annoying dependency we need to include because
-        zookeeper uses log4j and so we transatively do, but
-        log4j has some dependencies which aren't in the 
-        default maven repositories
-    //-->
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <version>1.2.15</version>
-      <exclusions>
-        <exclusion>
-          <groupId>javax.mail</groupId>
-          <artifactId>mail</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>javax.jms</groupId>
-          <artifactId>jms</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.sun.jdmk</groupId>
-          <artifactId>jmxtools</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.sun.jmx</groupId>
-          <artifactId>jmxri</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>jline</groupId>
-      <artifactId>jline</artifactId>
-      <version>0.9.94</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>hedwig-server-compat420</artifactId>
-      <version>4.2.0</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>bookkeeper-server</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>hedwig-server</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>hedwig-protocol</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>hedwig-client</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>hedwig-server-compat410</artifactId>
-      <version>4.1.0</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>bookkeeper-server</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>hedwig-server</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>hedwig-protocol</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>hedwig-client</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>hedwig-server-compat400</artifactId>
-      <version>4.0.0</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>bookkeeper-server</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>hedwig-server</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>hedwig-protocol</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.bookkeeper</groupId>
-          <artifactId>hedwig-client</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.rat</groupId>
-        <artifactId>apache-rat-plugin</artifactId>
-        <version>0.7</version>
-        <configuration>
-          <excludes>
-            <exclude>**/p12.pass</exclude>
-          </excludes>
-        </configuration>
-      </plugin>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <version>2.2.1</version>
-        <configuration>
-          <descriptors>
-            <descriptor>../src/assemble/bin.xml</descriptor>
-          </descriptors>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
-        <configuration>
-          <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
-        </configuration>
-      </plugin>
-      <plugin>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>createbuilddir</id>
-            <phase>generate-test-resources</phase>
-            <configuration>
-              <target>
-                <mkdir dir="target/zk_clientbase_build" />
-              </target>
-            </configuration>
-            <goals>
-              <goal>run</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-            <configuration>
-              <outputDirectory>${project.libdir}</outputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <systemPropertyVariables>
-            <derby.stream.error.file>target/derby.log</derby.stream.error.file>
-            <build.test.dir>target/zk_clientbase_build</build.test.dir>
-          </systemPropertyVariables>
-        </configuration>
-      </plugin>
-      <plugin>
-        <artifactId>maven-clean-plugin</artifactId>
-        <version>2.5</version>
-	<configuration>
-	  <filesets>
-            <fileset>
-              <directory>${project.libdir}</directory>
-              <followSymlinks>false</followSymlinks>
-            </fileset>
-	  </filesets>
-	</configuration>
-      </plugin>
-    </plugins>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
deleted file mode 100644
index ec38fc2..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.admin;
-
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.FactoryLayout;
-import org.apache.hedwig.server.meta.SubscriptionDataManager;
-import org.apache.hedwig.server.meta.TopicOwnershipManager;
-import org.apache.hedwig.server.meta.TopicPersistenceManager;
-import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.server.topics.HubLoad;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import com.google.protobuf.ByteString;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Hedwig Admin
- */
-public class HedwigAdmin {
-    private static final Logger LOG = LoggerFactory.getLogger(HedwigAdmin.class);
-
-    // NOTE: now it is fixed passwd used in hedwig
-    static byte[] passwd = "sillysecret".getBytes(UTF_8);
-
-    protected final ZooKeeper zk;
-    protected final BookKeeper bk;
-    protected final MetadataManagerFactory mmFactory;
-    protected final SubscriptionDataManager sdm;
-    protected final TopicOwnershipManager tom;
-    protected final TopicPersistenceManager tpm;
-
-    // hub configurations
-    protected final ServerConfiguration serverConf;
-    // bookkeeper configurations
-    protected final ClientConfiguration bkClientConf;
-
-    protected final CountDownLatch zkReadyLatch = new CountDownLatch(1);
-
-    // Empty watcher
-    private class MyWatcher implements Watcher {
-        public void process(WatchedEvent event) {
-            if (Event.KeeperState.SyncConnected.equals(event.getState())) {
-                zkReadyLatch.countDown();
-            }
-        }
-    }
-
-    static class SyncObj<T> {
-        boolean finished = false;
-        boolean success = false;
-        T value = null;
-        PubSubException exception = null;
-
-        synchronized void success(T v) {
-            finished = true;
-            success = true;
-            value = v;
-            notify();
-        }
-
-        synchronized void fail(PubSubException pse) {
-            finished = true;
-            success = false;
-            exception = pse;
-            notify();
-        }
-
-        synchronized void block() {
-            try {
-                while (!finished) {
-                    wait();
-                }
-            } catch (InterruptedException ie) {
-            }
-        }
-
-        synchronized boolean isSuccess() {
-            return success;
-        }
-    }
-
-    /**
-     * Stats of a hub
-     */
-    public static class HubStats {
-        HubInfo hubInfo;
-        HubLoad hubLoad;
-
-        public HubStats(HubInfo info, HubLoad load) {
-            this.hubInfo = info;
-            this.hubLoad = load;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("info : [").append(hubInfo.toString().trim().replaceAll("\n", ", "))
-              .append("], load : [").append(hubLoad.toString().trim().replaceAll("\n", ", "))
-              .append("]");
-            return sb.toString();
-        }
-    }
-
-    /**
-     * Hedwig Admin Constructor
-     *
-     * @param bkConf
-     *          BookKeeper Client Configuration.
-     * @param hubConf
-     *          Hub Server Configuration.
-     * @throws Exception
-     */
-    public HedwigAdmin(ClientConfiguration bkConf, ServerConfiguration hubConf) throws Exception {
-        this.serverConf = hubConf;
-        this.bkClientConf = bkConf;
-
-        // connect to zookeeper
-        zk = new ZooKeeper(hubConf.getZkHost(), hubConf.getZkTimeout(), new MyWatcher());
-        LOG.debug("Connecting to zookeeper {}, timeout = {}",
-                hubConf.getZkHost(), hubConf.getZkTimeout());
-        // wait until connection is ready
-        if (!zkReadyLatch.await(hubConf.getZkTimeout() * 2, TimeUnit.MILLISECONDS)) {
-            throw new Exception("Count not establish connection with ZooKeeper after " + hubConf.getZkTimeout() * 2 + " ms.");
-        }
-
-        // construct the metadata manager factory
-        mmFactory = MetadataManagerFactory.newMetadataManagerFactory(hubConf, zk);
-        tpm = mmFactory.newTopicPersistenceManager();
-        tom = mmFactory.newTopicOwnershipManager();
-        sdm = mmFactory.newSubscriptionDataManager();
-
-        // connect to bookkeeper
-        bk = new BookKeeper(bkClientConf, zk);
-        LOG.debug("Connecting to bookkeeper");
-    }
-
-    /**
-     * Close the hedwig admin.
-     *
-     * @throws Exception
-     */
-    public void close() throws Exception {
-        tpm.close();
-        tom.close();
-        sdm.close();
-        mmFactory.shutdown();
-        bk.close();
-        zk.close();
-    }
-
-    /**
-     * Return zookeeper handle used in hedwig admin.
-     *
-     * @return zookeeper handle
-     */
-    public ZooKeeper getZkHandle() {
-        return zk;
-    }
-
-    /**
-     * Return bookkeeper handle used in hedwig admin.
-     *
-     * @return bookkeeper handle
-     */
-    public BookKeeper getBkHandle() {
-        return bk;
-    }
-
-    /**
-     * Return hub server configuration used in hedwig admin
-     *
-     * @return hub server configuration
-     */
-    public ServerConfiguration getHubServerConf() {
-        return serverConf;
-    }
-
-    /**
-     * Return metadata manager factory.
-     *
-     * @return metadata manager factory instance.
-     */
-    public MetadataManagerFactory getMetadataManagerFactory() {
-        return mmFactory;
-    }
-
-    /**
-     * Return bookeeper passwd used in hedwig admin
-     *
-     * @return bookeeper passwd
-     */
-    public byte[] getBkPasswd() {
-        return Arrays.copyOf(passwd, passwd.length);
-    }
-
-    /**
-     * Return digest type used in hedwig admin
-     *
-     * @return bookeeper digest type
-     */
-    public DigestType getBkDigestType() {
-        return DigestType.CRC32;
-    }
-
-    /**
-     * Dose topic exist?
-     *
-     * @param topic
-     *            Topic name
-     * @return whether topic exists or not?
-     * @throws Exception
-     */
-    public boolean hasTopic(ByteString topic) throws Exception {
-        // current persistence info is bound with a topic, so if there is persistence info
-        // there is topic.
-        final SyncObj<Boolean> syncObj = new SyncObj<Boolean>();
-        tpm.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
-            @Override
-            public void operationFinished(Object ctx, Versioned<LedgerRanges> result) {
-                if (null == result) {
-                    syncObj.success(false);
-                } else {
-                    syncObj.success(true);
-                }
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException pse) {
-                syncObj.fail(pse);
-            }
-        }, syncObj);
-
-        syncObj.block();
-
-        if (!syncObj.isSuccess()) {
-            throw syncObj.exception;
-        }
-
-        return syncObj.value;
-    }
-
-    /**
-     * Get available hubs.
-     *
-     * @return available hubs and their loads
-     * @throws Exception
-     */
-    public Map<HedwigSocketAddress, HubStats> getAvailableHubs() throws Exception {
-        String zkHubsPath = serverConf.getZkHostsPrefix(new StringBuilder()).toString();
-        Map<HedwigSocketAddress, HubStats> hubs =
-            new HashMap<HedwigSocketAddress, HubStats>();
-        List<String> hosts = zk.getChildren(zkHubsPath, false);
-        for (String host : hosts) {
-            String zkHubPath = serverConf.getZkHostsPrefix(new StringBuilder())
-                                         .append("/").append(host).toString();
-            HedwigSocketAddress addr = new HedwigSocketAddress(host);
-            try {
-                Stat stat = new Stat();
-                byte[] data = zk.getData(zkHubPath, false, stat);
-                if (data == null) {
-                    continue;
-                }
-                HubLoad load = HubLoad.parse(new String(data, UTF_8));
-                HubInfo info = new HubInfo(addr, stat.getCzxid());
-                hubs.put(addr, new HubStats(info, load));
-            } catch (KeeperException ke) {
-                LOG.warn("Couldn't read hub data from ZooKeeper", ke);
-            } catch (InterruptedException ie) {
-                LOG.warn("Interrupted during read", ie);
-            }
-        }
-        return hubs;
-    }
-
-    /**
-     * Get list of topics
-     *
-     * @return list of topics
-     * @throws Exception
-     */
-    public Iterator<ByteString> getTopics() throws Exception {
-        return mmFactory.getTopics();
-    }
-
-    /**
-     * Return the topic owner of a topic
-     *
-     * @param topic
-     *            Topic name
-     * @return the address of the owner of a topic
-     * @throws Exception
-     */
-    public HubInfo getTopicOwner(ByteString topic) throws Exception {
-        final SyncObj<HubInfo> syncObj = new SyncObj<HubInfo>();
-        tom.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
-            @Override
-            public void operationFinished(Object ctx, Versioned<HubInfo> result) {
-                if (null == result) {
-                    syncObj.success(null);
-                } else {
-                    syncObj.success(result.getValue());
-                }
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException pse) {
-                syncObj.fail(pse);
-            }
-        }, syncObj);
-
-        syncObj.block();
-
-        if (!syncObj.isSuccess()) {
-            throw syncObj.exception;
-        }
-
-        return syncObj.value;
-    }
-
-    private static LedgerRange buildLedgerRange(long ledgerId, long startOfLedger, MessageSeqId endOfLedger) {
-        LedgerRange.Builder builder =
-            LedgerRange.newBuilder().setLedgerId(ledgerId).setStartSeqIdIncluded(startOfLedger)
-                       .setEndSeqIdIncluded(endOfLedger);
-        return builder.build();
-    }
-
-    /**
-     * Return the ledger range forming the topic
-     *
-     * @param topic
-     *          Topic name
-     * @return ledger ranges forming the topic
-     * @throws Exception
-     */
-    public List<LedgerRange> getTopicLedgers(ByteString topic) throws Exception {
-        final SyncObj<LedgerRanges> syncObj = new SyncObj<LedgerRanges>();
-        tpm.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
-            @Override
-            public void operationFinished(Object ctx, Versioned<LedgerRanges> result) {
-                if (null == result) {
-                    syncObj.success(null);
-                } else {
-                    syncObj.success(result.getValue());
-                }
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException pse) {
-                syncObj.fail(pse);
-            }
-        }, syncObj);
-
-        syncObj.block();
-
-        if (!syncObj.isSuccess()) {
-            throw syncObj.exception;
-        }
-
-        LedgerRanges ranges = syncObj.value;
-        if (null == ranges) {
-            return null;
-        }
-        List<LedgerRange> results = new ArrayList<LedgerRange>();
-        List<LedgerRange> lrs = ranges.getRangesList();
-        long startSeqId = 1L;
-        if (!lrs.isEmpty()) {
-            LedgerRange range = lrs.get(0);
-            if (!range.hasStartSeqIdIncluded() && range.hasEndSeqIdIncluded()) {
-                long ledgerId = range.getLedgerId();
-                try {
-                    LedgerHandle lh = bk.openLedgerNoRecovery(ledgerId, DigestType.CRC32, passwd);
-                    long numEntries = lh.readLastConfirmed() + 1;
-                    long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
-                    startSeqId = endOfLedger - numEntries + 1;
-                } catch (BKException.BKNoSuchLedgerExistsException be) {
-                    // ignore it
-                }
-            }
-        }
-        Iterator<LedgerRange> lrIter = lrs.iterator();
-        while (lrIter.hasNext()) {
-            LedgerRange range = lrIter.next();
-            if (range.hasEndSeqIdIncluded()) {
-                long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
-                if (range.hasStartSeqIdIncluded()) {
-                    startSeqId = range.getStartSeqIdIncluded();
-                } else {
-                    range = buildLedgerRange(range.getLedgerId(), startSeqId, range.getEndSeqIdIncluded());
-                }
-                results.add(range);
-                if (startSeqId < endOfLedger + 1) {
-                    startSeqId = endOfLedger + 1;
-                }
-                continue;
-            }
-            if (lrIter.hasNext()) {
-                throw new IllegalStateException("Ledger " + range.getLedgerId() + " for topic " + topic.toString()
-                                                + " is not the last one but still does not have an end seq-id");
-            }
-
-            if (range.hasStartSeqIdIncluded()) {
-                startSeqId = range.getStartSeqIdIncluded();
-            }
-
-            LedgerHandle lh = bk.openLedgerNoRecovery(range.getLedgerId(), DigestType.CRC32, passwd);
-            long endOfLedger = startSeqId + lh.readLastConfirmed();
-            MessageSeqId endSeqId = MessageSeqId.newBuilder().setLocalComponent(endOfLedger).build();
-            results.add(buildLedgerRange(range.getLedgerId(), startSeqId, endSeqId));
-        }
-        return results;
-    }
-
-    /**
-     * Return subscriptions of a topic
-     *
-     * @param topic
-     *          Topic name
-     * @return subscriptions of a topic
-     * @throws Exception
-     */
-    public Map<ByteString, SubscriptionData> getTopicSubscriptions(ByteString topic)
-        throws Exception {
-
-        final SyncObj<Map<ByteString, SubscriptionData>> syncObj =
-            new SyncObj<Map<ByteString, SubscriptionData>>();
-        sdm.readSubscriptions(topic, new Callback<Map<ByteString, Versioned<SubscriptionData>>>() {
-            @Override
-            public void operationFinished(Object ctx, Map<ByteString, Versioned<SubscriptionData>> result) {
-                // It was just used to console tool to print some information, so don't need to return version for it
-                // just keep the getTopicSubscriptions interface as before
-                Map<ByteString, SubscriptionData> subs = new ConcurrentHashMap<ByteString, SubscriptionData>();
-                for (Map.Entry<ByteString, Versioned<SubscriptionData>> subEntry : result.entrySet()) {
-                    subs.put(subEntry.getKey(), subEntry.getValue().getValue());
-                }
-                syncObj.success(subs);
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException pse) {
-                syncObj.fail(pse);
-            }
-        }, syncObj);
-
-        syncObj.block();
-
-        if (!syncObj.isSuccess()) {
-            throw syncObj.exception;
-        }
-
-        return syncObj.value;
-    }
-
-    /**
-     * Return subscription state of a subscriber of topic
-     *
-     * @param topic
-     *          Topic name
-     * @param subscriber
-     *          Subscriber name
-     * @return subscription state
-     * @throws Exception
-     */
-    public SubscriptionData getSubscription(ByteString topic, ByteString subscriber) throws Exception {
-        final SyncObj<SubscriptionData> syncObj = new SyncObj<SubscriptionData>();
-        sdm.readSubscriptionData(topic, subscriber, new Callback<Versioned<SubscriptionData>>() {
-            @Override
-            public void operationFinished(Object ctx, Versioned<SubscriptionData> result) {
-                if (null == result) {
-                    syncObj.success(null);
-                } else {
-                    syncObj.success(result.getValue());
-                }
-            }
-            @Override
-            public void operationFailed(Object ctx, PubSubException pse) {
-                syncObj.fail(pse);
-            }
-        }, syncObj);
-
-        syncObj.block();
-
-        if (!syncObj.isSuccess()) {
-            throw syncObj.exception;
-        }
-
-        return syncObj.value;
-    }
-
-    /**
-     * Format metadata for Hedwig.
-     */
-    public void format() throws Exception {
-        // format metadata first
-        mmFactory.format(serverConf, zk);
-        LOG.info("Formatted Hedwig metadata successfully.");
-        // remove metadata layout
-        FactoryLayout.deleteLayout(zk, serverConf);
-        LOG.info("Removed old factory layout.");
-        // create new metadata manager factory and write new metadata layout
-        MetadataManagerFactory.createMetadataManagerFactory(serverConf, zk,
-            serverConf.getMetadataManagerFactoryClass());
-        LOG.info("Created new factory layout.");
-    }
-}


Mime
View raw message