zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r1564946 [2/4] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/ bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/ bookkeeper-server/src/main/java/org/apache/bookkeepe...
Date Wed, 05 Feb 2014 21:43:40 GMT
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Wed Feb  5 21:43:39 2014
@@ -20,7 +20,6 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Enumeration;
@@ -37,6 +36,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
@@ -59,7 +59,7 @@ class PendingReadOp implements Enumerati
     final private ScheduledExecutorService scheduler;
     private ScheduledFuture<?> speculativeTask = null;
     Queue<LedgerEntryRequest> seq;
-    Set<InetSocketAddress> heardFromHosts;
+    Set<BookieSocketAddress> heardFromHosts;
     ReadCallback cb;
     Object ctx;
     LedgerHandle lh;
@@ -79,12 +79,12 @@ class PendingReadOp implements Enumerati
         int firstError = BKException.Code.OK;
         int numMissedEntryReads = 0;
 
-        final ArrayList<InetSocketAddress> ensemble;
+        final ArrayList<BookieSocketAddress> ensemble;
         final List<Integer> writeSet;
         final BitSet sentReplicas;
         final BitSet erroredReplicas;
 
-        LedgerEntryRequest(ArrayList<InetSocketAddress> ensemble, long lId, long eId) {
+        LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
             super(lId, eId);
 
             this.ensemble = ensemble;
@@ -93,7 +93,7 @@ class PendingReadOp implements Enumerati
             this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
         }
 
-        private int getReplicaIndex(InetSocketAddress host) {
+        private int getReplicaIndex(BookieSocketAddress host) {
             int bookieIndex = ensemble.indexOf(host);
             if (bookieIndex == -1) {
                 return NOT_FOUND;
@@ -112,9 +112,9 @@ class PendingReadOp implements Enumerati
             return b;
         }
 
-        private BitSet getHeardFromBitSet(Set<InetSocketAddress> heardFromHosts) {
+        private BitSet getHeardFromBitSet(Set<BookieSocketAddress> heardFromHosts) {
             BitSet b = new BitSet(ensemble.size());
-            for (InetSocketAddress i : heardFromHosts) {
+            for (BookieSocketAddress i : heardFromHosts) {
                 int index = ensemble.indexOf(i);
                 if (index != -1) {
                     b.set(index);
@@ -132,7 +132,7 @@ class PendingReadOp implements Enumerati
          * This returns the host we may have sent to for unit testing.
          * @return host we sent to if we sent. null otherwise.
          */
-        synchronized InetSocketAddress maybeSendSpeculativeRead(Set<InetSocketAddress> heardFromHosts) {
+        synchronized BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts) {
             if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
                 return null;
             }
@@ -150,7 +150,7 @@ class PendingReadOp implements Enumerati
             }
         }
 
-        synchronized InetSocketAddress sendNextRead() {
+        synchronized BookieSocketAddress sendNextRead() {
             if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
                 // we are done, the read has failed from all replicas, just fail the
                 // read
@@ -171,7 +171,7 @@ class PendingReadOp implements Enumerati
             nextReplicaIndexToReadFrom++;
 
             try {
-                InetSocketAddress to = ensemble.get(bookieIndex);
+                BookieSocketAddress to = ensemble.get(bookieIndex);
                 sendReadTo(to, this);
                 sentReplicas.set(replica);
                 return to;
@@ -183,7 +183,7 @@ class PendingReadOp implements Enumerati
             }
         }
 
-        synchronized void logErrorAndReattemptRead(InetSocketAddress host, String errMsg, int rc) {
+        synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
             if (BKException.Code.OK == firstError ||
                 BKException.Code.NoSuchEntryException == firstError) {
                 firstError = rc;
@@ -216,7 +216,7 @@ class PendingReadOp implements Enumerati
 
         // return true if we managed to complete the entry
         // return false if the read entry is not complete or it is already completed before
-        boolean complete(InetSocketAddress host, final ChannelBuffer buffer) {
+        boolean complete(BookieSocketAddress host, final ChannelBuffer buffer) {
             ChannelBufferInputStream is;
             try {
                 is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
@@ -262,7 +262,7 @@ class PendingReadOp implements Enumerati
         maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
                 - getLedgerMetadata().getAckQuorumSize();
         speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
-        heardFromHosts = new HashSet<InetSocketAddress>();
+        heardFromHosts = new HashSet<BookieSocketAddress>();
 
         readOpLogger = lh.bk.getReadOpLogger();
     }
@@ -281,7 +281,7 @@ class PendingReadOp implements Enumerati
     public void initiate() throws InterruptedException {
         long nextEnsembleChange = startEntryId, i = startEntryId;
         this.requestTimeMillis = MathUtils.now();
-        ArrayList<InetSocketAddress> ensemble = null;
+        ArrayList<BookieSocketAddress> ensemble = null;
 
         if (speculativeReadTimeout > 0) {
             speculativeTask = scheduler.scheduleWithFixedDelay(new Runnable() {
@@ -322,19 +322,19 @@ class PendingReadOp implements Enumerati
     }
 
     private static class ReadContext {
-        final InetSocketAddress to;
+        final BookieSocketAddress to;
         final LedgerEntryRequest entry;
 
-        ReadContext(InetSocketAddress to, LedgerEntryRequest entry) {
+        ReadContext(BookieSocketAddress to, LedgerEntryRequest entry) {
             this.to = to;
             this.entry = entry;
         }
     }
 
-    void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
+    void sendReadTo(BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
         lh.throttler.acquire();
 
-        lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId, 
+        lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
                                      this, new ReadContext(to, entry));
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java Wed Feb  5 21:43:39 2014
@@ -18,7 +18,6 @@
 package org.apache.bookkeeper.client;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,6 +30,7 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.conf.Configurable;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
@@ -38,7 +38,6 @@ import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.net.NodeBase;
 import org.apache.bookkeeper.net.ScriptBasedMapping;
 import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.bookkeeper.util.StringUtils;
 import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,7 +79,7 @@ public class RackawareEnsemblePlacementP
         /**
          * @return list of addresses representing the ensemble
          */
-        public ArrayList<InetSocketAddress> toList();
+        public ArrayList<BookieSocketAddress> toList();
     }
 
     protected static class TruePredicate implements Predicate {
@@ -97,7 +96,7 @@ public class RackawareEnsemblePlacementP
     protected static class EnsembleForReplacement implements Ensemble {
 
         public static final EnsembleForReplacement instance = new EnsembleForReplacement();
-        static final ArrayList<InetSocketAddress> EMPTY_LIST = new ArrayList<InetSocketAddress>(0);
+        static final ArrayList<BookieSocketAddress> EMPTY_LIST = new ArrayList<BookieSocketAddress>(0);
 
         @Override
         public void addBookie(BookieNode node) {
@@ -105,7 +104,7 @@ public class RackawareEnsemblePlacementP
         }
 
         @Override
-        public ArrayList<InetSocketAddress> toList() {
+        public ArrayList<BookieSocketAddress> toList() {
             return EMPTY_LIST;
         }
 
@@ -181,8 +180,8 @@ public class RackawareEnsemblePlacementP
         }
 
         @Override
-        public ArrayList<InetSocketAddress> toList() {
-            ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>(ensembleSize);
+        public ArrayList<BookieSocketAddress> toList() {
+            ArrayList<BookieSocketAddress> addresses = new ArrayList<BookieSocketAddress>(ensembleSize);
             for (BookieNode bn : chosenNodes) {
                 addresses.add(bn.getAddr());
             }
@@ -198,20 +197,20 @@ public class RackawareEnsemblePlacementP
 
     protected static class BookieNode implements Node {
 
-        private final InetSocketAddress addr; // identifier of a bookie node.
+        private final BookieSocketAddress addr; // identifier of a bookie node.
 
         private int level; // the level in topology tree
         private Node parent; // its parent in topology tree
         private String location = NetworkTopology.DEFAULT_RACK; // its network location
         private final String name;
 
-        BookieNode(InetSocketAddress addr, String networkLoc) {
+        BookieNode(BookieSocketAddress addr, String networkLoc) {
             this.addr = addr;
-            this.name = StringUtils.addrToString(addr);
+            this.name = addr.toString();
             setNetworkLocation(networkLoc);
         }
 
-        public InetSocketAddress getAddr() {
+        public BookieSocketAddress getAddr() {
             return addr;
         }
 
@@ -292,18 +291,18 @@ public class RackawareEnsemblePlacementP
     // for now, we just maintain the writable bookies' topology
     private final NetworkTopology topology;
     private DNSToSwitchMapping dnsResolver;
-    private final Map<InetSocketAddress, BookieNode> knownBookies;
+    private final Map<BookieSocketAddress, BookieNode> knownBookies;
     private BookieNode localNode;
     private final ReentrantReadWriteLock rwLock;
 
     public RackawareEnsemblePlacementPolicy() {
         topology = new NetworkTopology();
-        knownBookies = new HashMap<InetSocketAddress, BookieNode>();
+        knownBookies = new HashMap<BookieSocketAddress, BookieNode>();
 
         rwLock = new ReentrantReadWriteLock();
     }
 
-    private BookieNode createBookieNode(InetSocketAddress addr) {
+    private BookieNode createBookieNode(BookieSocketAddress addr) {
         return new BookieNode(addr, resolveNetworkLocation(addr));
     }
 
@@ -322,7 +321,7 @@ public class RackawareEnsemblePlacementP
 
         BookieNode bn;
         try {
-            bn = createBookieNode(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0));
+            bn = createBookieNode(new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0));
         } catch (UnknownHostException e) {
             LOG.error("Failed to get local host address : ", e);
             bn = null;
@@ -338,12 +337,12 @@ public class RackawareEnsemblePlacementP
         // do nothing
     }
 
-    private String resolveNetworkLocation(InetSocketAddress addr) {
+    private String resolveNetworkLocation(BookieSocketAddress addr) {
         List<String> names = new ArrayList<String>(1);
         if (dnsResolver instanceof CachedDNSToSwitchMapping) {
-            names.add(addr.getAddress().getHostAddress());
+            names.add(addr.getSocketAddress().getAddress().getHostAddress());
         } else {
-            names.add(addr.getHostName());
+            names.add(addr.getSocketAddress().getHostName());
         }
         // resolve network addresses
         List<String> rNames = dnsResolver.resolve(names);
@@ -359,12 +358,12 @@ public class RackawareEnsemblePlacementP
     }
 
     @Override
-    public Set<InetSocketAddress> onClusterChanged(Set<InetSocketAddress> writableBookies,
-            Set<InetSocketAddress> readOnlyBookies) {
+    public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
+            Set<BookieSocketAddress> readOnlyBookies) {
         rwLock.writeLock().lock();
         try {
-            ImmutableSet<InetSocketAddress> joinedBookies, leftBookies, deadBookies;
-            Set<InetSocketAddress> oldBookieSet = knownBookies.keySet();
+            ImmutableSet<BookieSocketAddress> joinedBookies, leftBookies, deadBookies;
+            Set<BookieSocketAddress> oldBookieSet = knownBookies.keySet();
             // left bookies : bookies in known bookies, but not in new writable bookie cluster.
             leftBookies = Sets.difference(oldBookieSet, writableBookies).immutableCopy();
             // joined bookies : bookies in new writable bookie cluster, but not in known bookies
@@ -378,7 +377,7 @@ public class RackawareEnsemblePlacementP
             }
 
             // node left
-            for (InetSocketAddress addr : leftBookies) {
+            for (BookieSocketAddress addr : leftBookies) {
                 BookieNode node = knownBookies.remove(addr);
                 topology.remove(node);
                 if (LOG.isDebugEnabled()) {
@@ -387,7 +386,7 @@ public class RackawareEnsemblePlacementP
             }
 
             // node joined
-            for (InetSocketAddress addr : joinedBookies) {
+            for (BookieSocketAddress addr : joinedBookies) {
                 BookieNode node = createBookieNode(addr);
                 topology.add(node);
                 knownBookies.put(addr, node);
@@ -402,9 +401,9 @@ public class RackawareEnsemblePlacementP
         }
     }
 
-    private Set<Node> convertBookiesToNodes(Set<InetSocketAddress> excludeBookies) {
+    private Set<Node> convertBookiesToNodes(Set<BookieSocketAddress> excludeBookies) {
         Set<Node> nodes = new HashSet<Node>();
-        for (InetSocketAddress addr : excludeBookies) {
+        for (BookieSocketAddress addr : excludeBookies) {
             BookieNode bn = knownBookies.get(addr);
             if (null == bn) {
                 bn = createBookieNode(addr);
@@ -415,8 +414,8 @@ public class RackawareEnsemblePlacementP
     }
 
     @Override
-    public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
-            Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
         rwLock.readLock().lock();
         try {
             Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
@@ -427,7 +426,7 @@ public class RackawareEnsemblePlacementP
             if (numRacks < 2) {
                 List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes,
                         EnsembleForReplacement.instance);
-                ArrayList<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>(ensembleSize);
+                ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
                 for (BookieNode bn : bns) {
                     addrs.add(bn.addr);
                 }
@@ -454,8 +453,8 @@ public class RackawareEnsemblePlacementP
     }
 
     @Override
-    public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace,
-            Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+    public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
+            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
         rwLock.readLock().lock();
         try {
             BookieNode bn = knownBookies.get(bookieToReplace);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java Wed Feb  5 21:43:39 2014
@@ -20,15 +20,16 @@
  */
 package org.apache.bookkeeper.client;
 
+import java.security.GeneralSecurityException;
+
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.versioning.Version;
 
-import java.security.GeneralSecurityException;
-import java.net.InetSocketAddress;
 import java.util.concurrent.RejectedExecutionException;
 
 /**
@@ -106,7 +107,7 @@ class ReadOnlyLedgerHandle extends Ledge
     }
 
     @Override
-    void handleBookieFailure(final InetSocketAddress addr, final int bookieIndex) {
+    void handleBookieFailure(final BookieSocketAddress addr, final int bookieIndex) {
         blockAddCompletions.incrementAndGet();
         synchronized (metadata) {
             try {

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java?rev=1564946&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java Wed Feb  5 21:43:39 2014
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.net;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.COLON;
+
+/**
+ * This is a data wrapper class that is an InetSocketAddress, it would use the hostname
+ * provided in constructors directly.
+ * <p>
+ * The string representation of a BookieSocketAddress is : <hostname>:<port>
+ */
+public class BookieSocketAddress {
+
+    // Member fields that make up this class.
+    private final String hostname;
+    private final int port;
+
+    private final InetSocketAddress socketAddress;
+
+    // Constructor that takes in both a port.
+    public BookieSocketAddress(String hostname, int port) {
+        this.hostname = hostname;
+        this.port = port;
+        socketAddress = new InetSocketAddress(hostname, port);
+    }
+
+    // Constructor from a String "serialized" version of this class.
+    public BookieSocketAddress(String addr) throws UnknownHostException {
+        String[] parts = addr.split(COLON);
+        if (parts.length < 2) {
+            throw new UnknownHostException(addr);
+        }
+        this.hostname = parts[0];
+        try {
+            this.port = Integer.parseInt(parts[1]);
+        } catch (NumberFormatException nfe) {
+            throw new UnknownHostException(addr);
+        }
+        socketAddress = new InetSocketAddress(hostname, port);
+    }
+
+    // Public getters
+    public String getHostname() {
+        return hostname;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    // Method to return an InetSocketAddress for the regular port.
+    public InetSocketAddress getSocketAddress() {
+        return socketAddress;
+    }
+
+    // Return the String "serialized" version of this object.
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(hostname).append(COLON).append(port);
+        return sb.toString();
+    }
+
+    // Implement an equals method comparing two HedwigSocketAddress objects.
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof BookieSocketAddress))
+            return false;
+        BookieSocketAddress that = (BookieSocketAddress) obj;
+        return this.hostname.equals(that.hostname) && (this.port == that.port);
+    }
+
+    @Override
+    public int hashCode() {
+        return this.hostname.hashCode() + 13 * this.port;
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Wed Feb  5 21:43:39 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.proto;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,11 +18,11 @@ package org.apache.bookkeeper.proto;
  * under the License.
  *
  */
+package org.apache.bookkeeper.proto;
 
 import static com.google.common.base.Charsets.UTF_8;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -36,6 +34,7 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -47,11 +46,10 @@ import org.jboss.netty.buffer.ChannelBuf
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * Implements the client-side part of the BookKeeper protocol.
  *
@@ -64,8 +62,8 @@ public class BookieClient {
 
     final OrderedSafeExecutor executor;
     final ClientSocketChannelFactory channelFactory;
-    final ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient> channels =
-        new ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient>();
+    final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClient> channels =
+        new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClient>();
     final ScheduledExecutorService timeoutExecutor = Executors
             .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
                     .setNameFormat("BKClient-TimeoutTaskExecutor-%d").build());
@@ -88,7 +86,7 @@ public class BookieClient {
         this.statsLogger = statsLogger;
     }
 
-    public PerChannelBookieClient lookupClient(InetSocketAddress addr) {
+    public PerChannelBookieClient lookupClient(BookieSocketAddress addr) {
         PerChannelBookieClient channel = channels.get(addr);
 
         if (channel == null) {
@@ -111,9 +109,9 @@ public class BookieClient {
         return channel;
     }
 
-    public void closeClients(Set<InetSocketAddress> addrs) {
+    public void closeClients(Set<BookieSocketAddress> addrs) {
         final HashSet<PerChannelBookieClient> clients = new HashSet<PerChannelBookieClient>();
-        for (InetSocketAddress a : addrs) {
+        for (BookieSocketAddress a : addrs) {
             PerChannelBookieClient c = channels.get(a);
             if (c != null) {
                 clients.add(c);
@@ -133,7 +131,8 @@ public class BookieClient {
             });
     }
 
-    public void addEntry(final InetSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId,
+    public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
+            final long entryId,
             final ChannelBuffer toSend, final WriteCallback cb, final Object ctx, final int options) {
         final PerChannelBookieClient client = lookupClient(addr);
         if (client == null) {
@@ -159,7 +158,7 @@ public class BookieClient {
         });
     }
 
-    public void readEntryAndFenceLedger(final InetSocketAddress addr,
+    public void readEntryAndFenceLedger(final BookieSocketAddress addr,
                                         final long ledgerId,
                                         final byte[] masterKey,
                                         final long entryId,
@@ -189,7 +188,7 @@ public class BookieClient {
         });
     }
 
-    public void readEntry(final InetSocketAddress addr, final long ledgerId, final long entryId,
+    public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
                           final ReadEntryCallback cb, final Object ctx) {
         final PerChannelBookieClient client = lookupClient(addr);
         if (client == null) {
@@ -276,7 +275,7 @@ public class BookieClient {
         }
         WriteCallback cb = new WriteCallback() {
 
-            public void writeComplete(int rc, long ledger, long entry, InetSocketAddress addr, Object ctx) {
+            public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress addr, Object ctx) {
                 Counter counter = (Counter) ctx;
                 counter.dec();
                 if (rc != 0) {
@@ -296,7 +295,7 @@ public class BookieClient {
         OrderedSafeExecutor executor = new OrderedSafeExecutor(1,
                 "BookieClientWorker");
         BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
-        InetSocketAddress addr = new InetSocketAddress(args[0], Integer.parseInt(args[1]));
+        BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
 
         for (int i = 0; i < 100000; i++) {
             counter.inc();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java Wed Feb  5 21:43:39 2014
@@ -81,7 +81,7 @@ class BookieNettyServer {
             // listen on all interfaces
             bindAddress = new InetSocketAddress(conf.getBookiePort());
         } else {
-            bindAddress = Bookie.getBookieAddress(conf);
+            bindAddress = Bookie.getBookieAddress(conf).getSocketAddress();
         }
         listenOn(bindAddress);
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java Wed Feb  5 21:43:39 2014
@@ -20,7 +20,6 @@
  */
 package org.apache.bookkeeper.proto;
 
-import java.nio.channels.ClosedChannelException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.jboss.netty.channel.Channel;
@@ -33,6 +32,8 @@ import org.jboss.netty.channel.group.Cha
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.channels.ClosedChannelException;
+
 /**
  * Serverside handler for bookkeeper requests
  */

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java Wed Feb  5 21:43:39 2014
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -34,6 +33,7 @@ import java.util.concurrent.TimeoutExcep
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.util.MathUtils;
 import org.jboss.netty.channel.Channel;
@@ -202,7 +202,7 @@ public class BookieRequestProcessor impl
 
     @Override
     public void writeComplete(int rc, long ledgerId, long entryId,
-                    InetSocketAddress addr, Object ctx) {
+                              BookieSocketAddress addr, Object ctx) {
         assert (ctx instanceof AddCtx);
         AddCtx addctx = (AddCtx) ctx;
         addctx.c.write(ResponseBuilder.buildAddResponse(addctx.r));

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Wed Feb  5 21:43:39 2014
@@ -22,35 +22,32 @@ package org.apache.bookkeeper.proto;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.UnknownHostException;
 
-import org.apache.zookeeper.KeeperException;
-
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.ExitCode;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.jmx.BKMBeanRegistry;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.Options;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implements the server-side part of the BookKeeper protocol.
  *
@@ -112,7 +109,7 @@ public class BookieServer {
     }
 
     @VisibleForTesting
-    public InetSocketAddress getLocalAddress() throws UnknownHostException {
+    public BookieSocketAddress getLocalAddress() throws UnknownHostException {
         return Bookie.getBookieAddress(conf);
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServerBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServerBean.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServerBean.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServerBean.java Wed Feb  5 21:43:39 2014
@@ -20,11 +20,9 @@ package org.apache.bookkeeper.proto;
 
 import java.net.UnknownHostException;
 
-import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
-import org.apache.bookkeeper.proto.BKStats;
 import org.apache.bookkeeper.proto.BKStats.OpStatData;
 
 /**
@@ -75,7 +73,7 @@ public class BookieServerBean implements
     @Override
     public String getServerPort() {
         try {
-            return StringUtils.addrToString(Bookie.getBookieAddress(conf));
+            return Bookie.getBookieAddress(conf).toString();
         } catch (UnknownHostException e) {
             return "localhost:" + conf.getBookiePort();
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java Wed Feb  5 21:43:39 2014
@@ -21,11 +21,11 @@
 
 package org.apache.bookkeeper.proto;
 
-import java.net.InetSocketAddress;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.zookeeper.AsyncCallback;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
@@ -62,7 +62,7 @@ public class BookkeeperInternalCallbacks
     }
 
     public interface WriteCallback {
-        void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx);
+        void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx);
     }
 
     public interface GenericCallback<T> {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Wed Feb  5 21:43:39 2014
@@ -18,7 +18,6 @@
 package org.apache.bookkeeper.proto;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
 import java.util.Queue;
@@ -30,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -73,8 +73,7 @@ public class PerChannelBookieClient exte
     static final long maxMemory = Runtime.getRuntime().maxMemory() / 5;
     public static final int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
 
-
-    InetSocketAddress addr;
+    BookieSocketAddress addr;
     AtomicLong totalBytesOutstanding;
     ClientSocketChannelFactory channelFactory;
     OrderedSafeExecutor executor;
@@ -148,20 +147,20 @@ public class PerChannelBookieClient exte
     }
 
     public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
-                                  InetSocketAddress addr, AtomicLong totalBytesOutstanding,
+                                  BookieSocketAddress addr, AtomicLong totalBytesOutstanding,
                                   ScheduledExecutorService timeoutExecutor) {
         this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, timeoutExecutor,
                 NullStatsLogger.INSTANCE);
     }
 
     public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
-                                  InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
+                                  BookieSocketAddress addr, AtomicLong totalBytesOutstanding) {
         this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, null,
                 NullStatsLogger.INSTANCE);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
-                                  ClientSocketChannelFactory channelFactory, InetSocketAddress addr,
+                                  ClientSocketChannelFactory channelFactory, BookieSocketAddress addr,
                                   AtomicLong totalBytesOutstanding, ScheduledExecutorService timeoutExecutor,
                                   StatsLogger parentStatsLogger) {
         this.conf = conf;
@@ -172,7 +171,7 @@ public class PerChannelBookieClient exte
         this.state = ConnectionState.DISCONNECTED;
 
         StringBuilder nameBuilder = new StringBuilder();
-        nameBuilder.append(addr.getHostName().replace('.', '_').replace('-', '_'))
+        nameBuilder.append(addr.getHostname().replace('.', '_').replace('-', '_'))
             .append("_").append(addr.getPort());
 
         this.statsLogger = parentStatsLogger.scope(BookKeeperClientStats.CHANNEL_SCOPE)
@@ -201,7 +200,7 @@ public class PerChannelBookieClient exte
         bootstrap.setOption("tcpNoDelay", conf.getClientTcpNoDelay());
         bootstrap.setOption("keepAlive", true);
 
-        ChannelFuture future = bootstrap.connect(addr);
+        ChannelFuture future = bootstrap.connect(addr.getSocketAddress());
         future.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
@@ -302,12 +301,19 @@ public class PerChannelBookieClient exte
      * {@link #connectIfNeededAndDoOp(GenericCallback)}
      *
      * @param ledgerId
+     *          Ledger Id
      * @param masterKey
+     *          Master Key
      * @param entryId
+     *          Entry Id
      * @param toSend
+     *          Buffer to send
      * @param cb
+     *          Write callback
      * @param ctx
+     *          Write callback context
      * @param options
+     *          Add options
      */
     void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
                   Object ctx, final int options) {
@@ -799,7 +805,7 @@ public class PerChannelBookieClient exte
             final long requestTimeMillis = MathUtils.now();
             this.cb = null == addEntryOpLogger ? originalCallback : new WriteCallback() {
                 @Override
-                public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
+                public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
                     long latencyMillis = MathUtils.now() - requestTimeMillis;
                     if (rc != BKException.Code.OK) {
                         addEntryOpLogger.registerFailedEvent(latencyMillis);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java Wed Feb  5 21:43:39 2014
@@ -20,55 +20,47 @@
  */
 package org.apache.bookkeeper.replication;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Set;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-
-import java.net.InetSocketAddress;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.client.BookKeeperAdmin;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
+import com.google.common.collect.Sets;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.BookiesListener;
 import org.apache.bookkeeper.client.LedgerChecker;
 import org.apache.bookkeeper.client.LedgerFragment;
-import org.apache.bookkeeper.client.BookiesListener;
-import org.apache.bookkeeper.util.StringUtils;
-
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
-
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
-
 import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.commons.collections.CollectionUtils;
-import com.google.common.collect.Sets;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Auditor is a single entity in the entire Bookie cluster and will be watching
  * all the bookies under 'ledgerrootpath/available' zkpath. When any of the
@@ -115,7 +107,6 @@ public class Auditor implements BookiesL
 
             this.bkc = new BookKeeper(new ClientConfiguration(conf), zkc);
             this.admin = new BookKeeperAdmin(bkc);
-
         } catch (CompatibilityException ce) {
             throw new UnavailableException(
                     "CompatibilityException while initializing Auditor", ce);
@@ -272,13 +263,13 @@ public class Auditor implements BookiesL
         // Watching on only available bookies is sufficient, as changes in readonly bookies also changes in available
         // bookies
         admin.notifyBookiesChanged(this);
-        Collection<InetSocketAddress> availableBkAddresses = admin.getAvailableBookies();
-        Collection<InetSocketAddress> readOnlyBkAddresses = admin.getReadOnlyBookies();
+        Collection<BookieSocketAddress> availableBkAddresses = admin.getAvailableBookies();
+        Collection<BookieSocketAddress> readOnlyBkAddresses = admin.getReadOnlyBookies();
         availableBkAddresses.addAll(readOnlyBkAddresses);
 
         List<String> availableBookies = new ArrayList<String>();
-        for (InetSocketAddress addr : availableBkAddresses) {
-            availableBookies.add(StringUtils.addrToString(addr));
+        for (BookieSocketAddress addr : availableBkAddresses) {
+            availableBookies.add(addr.toString());
         }
         return availableBookies;
     }
@@ -352,13 +343,12 @@ public class Auditor implements BookiesL
         public void operationComplete(int rc, Set<LedgerFragment> fragments) {
             try {
                 if (rc == BKException.Code.OK) {
-                    Set<InetSocketAddress> bookies = Sets.newHashSet();
+                    Set<BookieSocketAddress> bookies = Sets.newHashSet();
                     for (LedgerFragment f : fragments) {
                         bookies.add(f.getAddress());
                     }
-                    for (InetSocketAddress bookie : bookies) {
-                        publishSuspectedLedgers(StringUtils.addrToString(bookie),
-                                                Sets.newHashSet(lh.getId()));
+                    for (BookieSocketAddress bookie : bookies) {
+                        publishSuspectedLedgers(bookie.toString(), Sets.newHashSet(lh.getId()));
                     }
                 }
                 lh.close();
@@ -508,7 +498,7 @@ public class Auditor implements BookiesL
 
     /**
      * Return true if auditor is running otherwise return false
-     * 
+     *
      * @return auditor status
      */
     public boolean isRunning() {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java Wed Feb  5 21:43:39 2014
@@ -31,7 +31,6 @@ import org.apache.bookkeeper.conf.Server
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
@@ -80,8 +79,7 @@ public class AutoRecoveryMain {
             }
         };
         zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w);
-        auditorElector = new AuditorElector(
-                StringUtils.addrToString(Bookie.getBookieAddress(conf)), conf, zk);
+        auditorElector = new AuditorElector(Bookie.getBookieAddress(conf).toString(), conf, zk);
         replicationWorker = new ReplicationWorker(zk, conf,
                 Bookie.getBookieAddress(conf));
         deathWatcher = new AutoRecoveryDeathWatcher(this);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java Wed Feb  5 21:43:39 2014
@@ -17,7 +17,6 @@
  */
 package org.apache.bookkeeper.replication;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -30,10 +29,10 @@ import java.util.concurrent.CountDownLat
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
-import org.apache.bookkeeper.util.StringUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,12 +74,12 @@ public class BookieLedgerIndexer {
                     public void operationComplete(final int rc,
                             LedgerMetadata ledgerMetadata) {
                         if (rc == BKException.Code.OK) {
-                            for (Map.Entry<Long, ArrayList<InetSocketAddress>> ensemble : ledgerMetadata
+                            for (Map.Entry<Long, ArrayList<BookieSocketAddress>> ensemble : ledgerMetadata
                                     .getEnsembles().entrySet()) {
-                                for (InetSocketAddress bookie : ensemble
+                                for (BookieSocketAddress bookie : ensemble
                                         .getValue()) {
                                     putLedger(bookie2ledgersMap,
-                                              StringUtils.addrToString(bookie),
+                                              bookie.toString(),
                                               ledgerId);
                                 }
                             }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Wed Feb  5 21:43:39 2014
@@ -20,7 +20,6 @@
 package org.apache.bookkeeper.replication;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Set;
 import java.util.Timer;
@@ -29,18 +28,19 @@ import java.util.concurrent.CountDownLat
 
 import org.apache.bookkeeper.bookie.BookieThread;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
+import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
+import org.apache.bookkeeper.client.BKException.BKReadException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.LedgerChecker;
 import org.apache.bookkeeper.client.LedgerFragment;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
-import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
-import org.apache.bookkeeper.client.BKException.BKReadException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
@@ -62,7 +62,7 @@ public class ReplicationWorker implement
     private volatile boolean workerRunning = false;
     final private BookKeeperAdmin admin;
     private LedgerChecker ledgerChecker;
-    private InetSocketAddress targetBookie;
+    private BookieSocketAddress targetBookie;
     private BookKeeper bkc;
     private Thread workerThread;
     private long openLedgerRereplicationGracePeriod;
@@ -82,7 +82,7 @@ public class ReplicationWorker implement
      *            local Bookie address.
      */
     public ReplicationWorker(final ZooKeeper zkc,
-            final ServerConfiguration conf, InetSocketAddress targetBKAddr)
+            final ServerConfiguration conf, BookieSocketAddress targetBKAddr)
             throws CompatibilityException, KeeperException,
             InterruptedException, IOException {
         this.zkc = zkc;
@@ -328,8 +328,8 @@ public class ReplicationWorker implement
 
     private boolean isTargetBookieExistsInFragmentEnsemble(LedgerHandle lh,
             LedgerFragment ledgerFragment) {
-        List<InetSocketAddress> ensemble = ledgerFragment.getEnsemble();
-        for (InetSocketAddress bkAddr : ensemble) {
+        List<BookieSocketAddress> ensemble = ledgerFragment.getEnsemble();
+        for (BookieSocketAddress bkAddr : ensemble) {
             if (targetBookie.equals(bkAddr)) {
                 return true;
             }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java Wed Feb  5 21:43:39 2014
@@ -21,22 +21,18 @@ package org.apache.bookkeeper.tools;
  *
  */
 
-import java.io.IOException;
-import org.apache.zookeeper.KeeperException;
-import java.net.InetSocketAddress;
-
-import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.zookeeper.KeeperException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
 
 /**
  * Provides Admin Tools to manage the BookKeeper cluster.
  *
  */
 public class BookKeeperTools {
-    private final static Logger LOG = LoggerFactory.getLogger(BookKeeperTools.class);
 
     /**
      * Main method so we can invoke the bookie recovery via command line.
@@ -53,7 +49,7 @@ public class BookKeeperTools {
      * @throws KeeperException
      * @throws BKException
      */
-    public static void main(String[] args) 
+    public static void main(String[] args)
             throws InterruptedException, IOException, KeeperException, BKException {
         // Validate the inputs
         if (args.length < 2) {
@@ -67,9 +63,9 @@ public class BookKeeperTools {
             System.err.println("BookieSrc inputted has invalid name format (host:port expected): " + args[1]);
             return;
         }
-        final InetSocketAddress bookieSrc = new InetSocketAddress(bookieSrcString[0], Integer
+        final BookieSocketAddress bookieSrc = new BookieSocketAddress(bookieSrcString[0], Integer
                 .parseInt(bookieSrcString[1]));
-        InetSocketAddress bookieDest = null;
+        BookieSocketAddress bookieDest = null;
         if (args.length < 3) {
             String bookieDestString[] = args[2].split(":");
             if (bookieDestString.length < 2) {
@@ -77,7 +73,7 @@ public class BookKeeperTools {
                                    + args[2]);
                 return;
             }
-            bookieDest = new InetSocketAddress(bookieDestString[0], Integer.parseInt(bookieDestString[1]));
+            bookieDest = new BookieSocketAddress(bookieDestString[0], Integer.parseInt(bookieDestString[1]));
         }
 
         // Create the BookKeeperTools instance and perform the bookie recovery

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java Wed Feb  5 21:43:39 2014
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.util;
  */
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 /**
  * Provided utilites for parsing network addresses, ledger-id from node paths
@@ -32,34 +31,6 @@ public class StringUtils {
     static public final String LEDGER_NODE_PREFIX = "L";
 
     /**
-     * Parses address into IP and port.
-     *
-     * @param addr
-     *            String
-     */
-
-    public static InetSocketAddress parseAddr(String s) throws IOException {
-
-        String parts[] = s.split(":");
-        if (parts.length != 2) {
-            throw new IOException(s + " does not have the form host:port");
-        }
-        int port;
-        try {
-            port = Integer.parseInt(parts[1]);
-        } catch (NumberFormatException e) {
-            throw new IOException(s + " does not have the form host:port");
-        }
-
-        InetSocketAddress addr = new InetSocketAddress(parts[0], port);
-        return addr;
-    }
-
-    public static String addrToString(InetSocketAddress addr) {
-        return addr.getAddress().getHostAddress() + ":" + addr.getPort();
-    }
-
-    /**
      * Formats ledger ID according to ZooKeeper rules
      *
      * @param id

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Wed Feb  5 21:43:39 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,36 +18,37 @@ package org.apache.bookkeeper.client;
  * under the License.
  *
  */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.Collections;
 import java.util.Random;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This class tests the bookie recovery admin functionality.
@@ -244,7 +243,7 @@ public class BookieRecoveryTest extends 
         for (int i = 0; i < numEntries; i++) {
             lh.addEntry(data);
         }
-        InetSocketAddress bookieToKill = lh.getLedgerMetadata().getEnsemble(numEntries - 1).get(1);
+        BookieSocketAddress bookieToKill = lh.getLedgerMetadata().getEnsemble(numEntries - 1).get(1);
         killBookie(bookieToKill);
         startNewBookie();
         for (int i = 0; i < numEntries; i++) {
@@ -299,8 +298,10 @@ public class BookieRecoveryTest extends 
         writeEntriestoLedgers(numMsgs, 10, lhs);
 
         // Call the async recover bookie method.
-        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
-        InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+        BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
+                initialPort);
+        BookieSocketAddress bookieDest = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
+                newBookiePort);
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
                  + bookieDest + ")");
         // Initiate the sync object
@@ -355,8 +356,9 @@ public class BookieRecoveryTest extends 
         writeEntriestoLedgers(numMsgs, 10, lhs);
 
         // Call the async recover bookie method.
-        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
-        InetSocketAddress bookieDest = null;
+        BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
+                initialPort);
+        BookieSocketAddress bookieDest = null;
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc
                  + ") and replicate it to a random available one");
         // Initiate the sync object
@@ -408,8 +410,10 @@ public class BookieRecoveryTest extends 
         writeEntriestoLedgers(numMsgs, 10, lhs);
 
         // Call the sync recover bookie method.
-        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
-        InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+        BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
+                initialPort);
+        BookieSocketAddress bookieDest = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
+                newBookiePort);
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
                  + bookieDest + ")");
         bkAdmin.recoverBookieData(bookieSrc, bookieDest);
@@ -454,8 +458,9 @@ public class BookieRecoveryTest extends 
         writeEntriestoLedgers(numMsgs, 10, lhs);
 
         // Call the sync recover bookie method.
-        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
-        InetSocketAddress bookieDest = null;
+        BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
+                initialPort);
+        BookieSocketAddress bookieDest = null;
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc
                  + ") and replicate it to a random available one");
         bkAdmin.recoverBookieData(bookieSrc, bookieDest);
@@ -476,7 +481,6 @@ public class BookieRecoveryTest extends 
         @Override
         public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) {
             if (LOG.isDebugEnabled()) {
-                InetSocketAddress addr = (InetSocketAddress)ctx;
                 LOG.debug("Got " + rc + " for ledger " + ledgerId + " entry " + entryId + " from " + ctx);
             }
             if (rc == BKException.Code.OK) {
@@ -498,7 +502,7 @@ public class BookieRecoveryTest extends 
     private boolean verifyFullyReplicated(LedgerHandle lh, long untilEntry) throws Exception {
         LedgerMetadata md = getLedgerMetadata(lh);
 
-        Map<Long, ArrayList<InetSocketAddress>> ensembles = md.getEnsembles();
+        Map<Long, ArrayList<BookieSocketAddress>> ensembles = md.getEnsembles();
 
         HashMap<Long, Long> ranges = new HashMap<Long, Long>();
         ArrayList<Long> keyList = Collections.list(
@@ -509,7 +513,7 @@ public class BookieRecoveryTest extends 
         }
         ranges.put(keyList.get(keyList.size()-1), untilEntry);
 
-        for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : ensembles.entrySet()) {
+        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : ensembles.entrySet()) {
             int quorum = md.getAckQuorumSize();
             long startEntryId = e.getKey();
             long endEntryId = ranges.get(startEntryId);
@@ -518,7 +522,7 @@ public class BookieRecoveryTest extends 
 
             ReplicationVerificationCallback cb = new ReplicationVerificationCallback(numRequests);
             for (long i = startEntryId; i < endEntryId; i++) {
-                for (InetSocketAddress addr : e.getValue()) {
+                for (BookieSocketAddress addr : e.getValue()) {
                     bkc.bookieClient.readEntry(addr, lh.getId(), i, cb, addr);
                 }
             }
@@ -577,11 +581,11 @@ public class BookieRecoveryTest extends 
         long numDupes = 0;
         for (LedgerHandle lh : lhs) {
             LedgerMetadata md = getLedgerMetadata(lh);
-            for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : md.getEnsembles().entrySet()) {
-                HashSet<InetSocketAddress> set = new HashSet<InetSocketAddress>();
+            for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : md.getEnsembles().entrySet()) {
+                HashSet<BookieSocketAddress> set = new HashSet<BookieSocketAddress>();
                 long fragment = e.getKey();
 
-                for (InetSocketAddress addr : e.getValue()) {
+                for (BookieSocketAddress addr : e.getValue()) {
                     if (set.contains(addr)) {
                         LOG.error("Dupe " + addr + " found in ensemble for fragment " + fragment
                                 + " of ledger " + lh.getId());
@@ -610,15 +614,15 @@ public class BookieRecoveryTest extends 
         closeLedgers(lhs);
 
         // Shutdown last bookie server in last ensemble
-        ArrayList<InetSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+        ArrayList<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
                                                        .entrySet().iterator().next().getValue();
-        InetSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
+        BookieSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
         killBookie(bookieToKill);
 
         // start a new bookie
         startNewBookie();
 
-        InetSocketAddress bookieDest = null;
+        BookieSocketAddress bookieDest = null;
         LOG.info("Now recover the data on the killed bookie (" + bookieToKill
                + ") and replicate it to a random available one");
 
@@ -640,15 +644,15 @@ public class BookieRecoveryTest extends 
         writeEntriestoLedgers(numMsgs, 0, lhs);
 
         // Shutdown the first bookie server
-        ArrayList<InetSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+        ArrayList<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
                                                        .entrySet().iterator().next().getValue();
-        InetSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
+        BookieSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
         killBookie(bookieToKill);
 
         // start a new bookie
         startNewBookie();
 
-        InetSocketAddress bookieDest = null;
+        BookieSocketAddress bookieDest = null;
         LOG.info("Now recover the data on the killed bookie (" + bookieToKill
                + ") and replicate it to a random available one");
 
@@ -677,13 +681,13 @@ public class BookieRecoveryTest extends 
         writeEntriestoLedgers(numMsgs, 0, lhs);
 
         // Shutdown the first bookie server
-        ArrayList<InetSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+        ArrayList<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
                                                        .entrySet().iterator().next().getValue();
         // removed bookie
-        InetSocketAddress bookieToKill = lastEnsemble.get(0);
+        BookieSocketAddress bookieToKill = lastEnsemble.get(0);
         killBookie(bookieToKill);
         // temp failure
-        InetSocketAddress bookieToKill2 = lastEnsemble.get(1);
+        BookieSocketAddress bookieToKill2 = lastEnsemble.get(1);
         ServerConfiguration conf2 = killBookie(bookieToKill2);
 
         // start a new bookie
@@ -720,7 +724,7 @@ public class BookieRecoveryTest extends 
         List<LedgerHandle> newLhs = openLedgers(lhs);
         for (LedgerHandle newLh : newLhs) {
             // first ensemble should contains bookieToKill2 and not contain bookieToKill
-            Map.Entry<Long, ArrayList<InetSocketAddress>> entry =
+            Map.Entry<Long, ArrayList<BookieSocketAddress>> entry =
                 newLh.getLedgerMetadata().getEnsembles().entrySet().iterator().next();
             assertFalse(entry.getValue().contains(bookieToKill));
             assertTrue(entry.getValue().contains(bookieToKill2));
@@ -745,8 +749,8 @@ public class BookieRecoveryTest extends 
         bs.remove(0);
 
         // Call the async recover bookie method.
-        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
-        InetSocketAddress bookieDest = null;
+        BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
+                initialPort);
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc
                  + ") and replicate it to a random available one");
         // Initiate the sync object
@@ -774,7 +778,7 @@ public class BookieRecoveryTest extends 
         // Shutdown the first bookie server
         LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
         int removeIndex = r.nextInt(bs.size());
-        InetSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress();
+        BookieSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress();
         bs.get(removeIndex).shutdown();
         bs.remove(removeIndex);
 
@@ -808,7 +812,6 @@ public class BookieRecoveryTest extends 
         byte[] passwdCorrect = "AAAAAA".getBytes();
         byte[] passwdBad = "BBBBBB".getBytes();
         DigestType digestCorrect = digestType;
-        DigestType digestBad = (digestType == DigestType.MAC) ? DigestType.CRC32 : DigestType.MAC;
 
         LedgerHandle lh = bkc.createLedger(3, 2, digestCorrect, passwdCorrect);
         long ledgerId = lh.getId();
@@ -817,7 +820,7 @@ public class BookieRecoveryTest extends 
         }
         lh.close();
 
-        InetSocketAddress bookieSrc = bs.get(0).getLocalAddress();
+        BookieSocketAddress bookieSrc = bs.get(0).getLocalAddress();
         bs.get(0).shutdown();
         bs.remove(0);
         startNewBookie();
@@ -919,9 +922,9 @@ public class BookieRecoveryTest extends 
         bkc41.close();
 
         // Startup a new bookie server
-        int newBookiePort = startNewBookie();
+        startNewBookie();
         int removeIndex = 0;
-        InetSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress();
+        BookieSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress();
         bs.get(removeIndex).shutdown();
         bs.remove(removeIndex);
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java Wed Feb  5 21:43:39 2014
@@ -20,7 +20,6 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Enumeration;
@@ -28,6 +27,7 @@ import java.util.Random;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
 import org.junit.Before;
 import org.junit.Test;
@@ -107,7 +107,7 @@ public class BookieWriteLedgerTest exten
         startNewBookie();
 
         // Shutdown three bookies in the last ensemble and continue writing
-        ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata()
+        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
                 .getEnsembles().entrySet().iterator().next().getValue();
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
@@ -156,7 +156,7 @@ public class BookieWriteLedgerTest exten
         startNewBookie();
 
         // Shutdown three bookies in the last ensemble and continue writing
-        ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata()
+        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
                 .getEnsembles().entrySet().iterator().next().getValue();
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java Wed Feb  5 21:43:39 2014
@@ -18,7 +18,6 @@
 package org.apache.bookkeeper.client;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -31,6 +30,7 @@ import org.apache.bookkeeper.client.Asyn
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -133,7 +133,7 @@ public class LedgerCloseTest extends Boo
         final CountDownLatch recoverDoneLatch = new CountDownLatch(1);
         final CountDownLatch failedLatch = new CountDownLatch(1);
         // kill first bookie to replace with a unauthorize bookie
-        InetSocketAddress bookie = lh.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress bookie = lh.getLedgerMetadata().currentEnsemble.get(0);
         ServerConfiguration conf = killBookie(bookie);
         // replace a unauthorize bookie
         startUnauthorizedBookie(conf, addDoneLatch);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java Wed Feb  5 21:43:39 2014
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.client;
  */
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +32,7 @@ import org.apache.bookkeeper.bookie.Book
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.test.BaseTestCase;
 import org.junit.Test;
@@ -179,7 +179,7 @@ public class LedgerRecoveryTest extends 
         // kill first bookie server to start a fake one to simulate a slow bookie
         // and failed to add entry on crash
         // until write succeed
-        InetSocketAddress host = beforelh.getLedgerMetadata().currentEnsemble.get(slowBookieIdx);
+        BookieSocketAddress host = beforelh.getLedgerMetadata().currentEnsemble.get(slowBookieIdx);
         ServerConfiguration conf = killBookie(host);
 
         Bookie fakeBookie = new Bookie(conf) {
@@ -253,7 +253,7 @@ public class LedgerRecoveryTest extends 
         bs.add(startBookie(conf, deadBookie1));
 
         // kill first bookie server
-        InetSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0);
         ServerConfiguration conf1 = killBookie(bookie1);
 
         // Try to recover and fence the ledger after killing one bookie in the
@@ -268,7 +268,7 @@ public class LedgerRecoveryTest extends 
         // restart the first server, kill the second
         bsConfs.add(conf1);
         bs.add(startBookie(conf1));
-        InetSocketAddress bookie2 = lhbefore.getLedgerMetadata().currentEnsemble.get(1);
+        BookieSocketAddress bookie2 = lhbefore.getLedgerMetadata().currentEnsemble.get(1);
         ServerConfiguration conf2 = killBookie(bookie2);
 
         // using async, because this could trigger an assertion
@@ -334,8 +334,8 @@ public class LedgerRecoveryTest extends 
         bs.add(startBookie(conf, deadBookie1));
 
         // kill first bookie server
-        InetSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0);
-        ServerConfiguration conf1 = killBookie(bookie1);
+        BookieSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0);
+        killBookie(bookie1);
 
         // Try to recover and fence the ledger after killing one bookie in the
         // ensemble in the ensemble, and another bookie is available in zk but not writtable
@@ -386,9 +386,9 @@ public class LedgerRecoveryTest extends 
             fail("Failed to add " + numEntries + " to ledger handle " + lh.getId());
         }
         // kill first 2 bookies to replace bookies
-        InetSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0);
         ServerConfiguration conf1 = killBookie(bookie1);
-        InetSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(1);
+        BookieSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(1);
         ServerConfiguration conf2 = killBookie(bookie2);
 
         // replace these two bookies

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java Wed Feb  5 21:43:39 2014
@@ -21,22 +21,20 @@
 
 package org.apache.bookkeeper.client;
 
-import java.util.Set;
 import java.util.List;
-
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.net.InetSocketAddress;
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("deprecation")
 public class SlowBookieTest extends BookKeeperClusterTestCase {
@@ -64,7 +62,7 @@ public class SlowBookieTest extends Book
         final CountDownLatch b0latch = new CountDownLatch(1);
         final CountDownLatch b1latch = new CountDownLatch(1);
         final CountDownLatch addEntrylatch = new CountDownLatch(1);
-        List<InetSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
         try {
             sleepBookie(curEns.get(0), b0latch);
             for (int i = 0; i < 10; i++) {



Mime
View raw message