From commits-return-3575-apmail-zookeeper-commits-archive=zookeeper.apache.org@zookeeper.apache.org Wed Feb 5 21:44:13 2014 Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4328C10AEB for ; Wed, 5 Feb 2014 21:44:13 +0000 (UTC) Received: (qmail 74342 invoked by uid 500); 5 Feb 2014 21:44:09 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 74238 invoked by uid 500); 5 Feb 2014 21:44:08 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 74224 invoked by uid 99); 5 Feb 2014 21:44:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Feb 2014 21:44:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Feb 2014 21:44:03 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 110382388994; Wed, 5 Feb 2014 21:43:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1564946 [2/4] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/ bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/ bookkeeper-server/src/main/java/org/apache/bookkeepe... Date: Wed, 05 Feb 2014 21:43:40 -0000 To: commits@zookeeper.apache.org From: fpj@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140205214343.110382388994@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Wed Feb 5 21:43:39 2014 @@ -20,7 +20,6 @@ */ package org.apache.bookkeeper.client; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.BitSet; import java.util.Enumeration; @@ -37,6 +36,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.BKException.BKDigestMatchException; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; @@ -59,7 +59,7 @@ class PendingReadOp implements Enumerati final private ScheduledExecutorService scheduler; private ScheduledFuture speculativeTask = null; Queue seq; - Set heardFromHosts; + Set heardFromHosts; ReadCallback cb; Object ctx; LedgerHandle lh; @@ -79,12 +79,12 @@ class PendingReadOp implements Enumerati int firstError = BKException.Code.OK; int numMissedEntryReads = 0; - final ArrayList ensemble; + final ArrayList ensemble; final List writeSet; final BitSet sentReplicas; final BitSet erroredReplicas; - LedgerEntryRequest(ArrayList ensemble, long lId, long eId) { + LedgerEntryRequest(ArrayList ensemble, long lId, long eId) { super(lId, eId); this.ensemble = ensemble; @@ -93,7 +93,7 @@ class PendingReadOp implements Enumerati this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); } - private int getReplicaIndex(InetSocketAddress host) { + private int getReplicaIndex(BookieSocketAddress host) { int bookieIndex = ensemble.indexOf(host); if (bookieIndex == -1) { return NOT_FOUND; @@ -112,9 +112,9 @@ class PendingReadOp implements Enumerati return b; } - private BitSet getHeardFromBitSet(Set heardFromHosts) { + private BitSet getHeardFromBitSet(Set heardFromHosts) { BitSet b = new BitSet(ensemble.size()); - for (InetSocketAddress i : heardFromHosts) { + for (BookieSocketAddress i : heardFromHosts) { int index = ensemble.indexOf(i); if (index != -1) { b.set(index); @@ -132,7 +132,7 @@ class PendingReadOp implements Enumerati * This returns the host we may have sent to for unit testing. * @return host we sent to if we sent. null otherwise. */ - synchronized InetSocketAddress maybeSendSpeculativeRead(Set heardFromHosts) { + synchronized BookieSocketAddress maybeSendSpeculativeRead(Set heardFromHosts) { if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { return null; } @@ -150,7 +150,7 @@ class PendingReadOp implements Enumerati } } - synchronized InetSocketAddress sendNextRead() { + synchronized BookieSocketAddress sendNextRead() { if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { // we are done, the read has failed from all replicas, just fail the // read @@ -171,7 +171,7 @@ class PendingReadOp implements Enumerati nextReplicaIndexToReadFrom++; try { - InetSocketAddress to = ensemble.get(bookieIndex); + BookieSocketAddress to = ensemble.get(bookieIndex); sendReadTo(to, this); sentReplicas.set(replica); return to; @@ -183,7 +183,7 @@ class PendingReadOp implements Enumerati } } - synchronized void logErrorAndReattemptRead(InetSocketAddress host, String errMsg, int rc) { + synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) { if (BKException.Code.OK == firstError || BKException.Code.NoSuchEntryException == firstError) { firstError = rc; @@ -216,7 +216,7 @@ class PendingReadOp implements Enumerati // return true if we managed to complete the entry // return false if the read entry is not complete or it is already completed before - boolean complete(InetSocketAddress host, final ChannelBuffer buffer) { + boolean complete(BookieSocketAddress host, final ChannelBuffer buffer) { ChannelBufferInputStream is; try { is = lh.macManager.verifyDigestAndReturnData(entryId, buffer); @@ -262,7 +262,7 @@ class PendingReadOp implements Enumerati maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize() - getLedgerMetadata().getAckQuorumSize(); speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout(); - heardFromHosts = new HashSet(); + heardFromHosts = new HashSet(); readOpLogger = lh.bk.getReadOpLogger(); } @@ -281,7 +281,7 @@ class PendingReadOp implements Enumerati public void initiate() throws InterruptedException { long nextEnsembleChange = startEntryId, i = startEntryId; this.requestTimeMillis = MathUtils.now(); - ArrayList ensemble = null; + ArrayList ensemble = null; if (speculativeReadTimeout > 0) { speculativeTask = scheduler.scheduleWithFixedDelay(new Runnable() { @@ -322,19 +322,19 @@ class PendingReadOp implements Enumerati } private static class ReadContext { - final InetSocketAddress to; + final BookieSocketAddress to; final LedgerEntryRequest entry; - ReadContext(InetSocketAddress to, LedgerEntryRequest entry) { + ReadContext(BookieSocketAddress to, LedgerEntryRequest entry) { this.to = to; this.entry = entry; } } - void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws InterruptedException { + void sendReadTo(BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException { lh.throttler.acquire(); - lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId, + lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId, this, new ReadContext(to, entry)); } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java Wed Feb 5 21:43:39 2014 @@ -18,7 +18,6 @@ package org.apache.bookkeeper.client; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -31,6 +30,7 @@ import java.util.concurrent.locks.Reentr import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.conf.Configurable; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.CachedDNSToSwitchMapping; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.net.NetworkTopology; @@ -38,7 +38,6 @@ import org.apache.bookkeeper.net.Node; import org.apache.bookkeeper.net.NodeBase; import org.apache.bookkeeper.net.ScriptBasedMapping; import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.bookkeeper.util.StringUtils; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +79,7 @@ public class RackawareEnsemblePlacementP /** * @return list of addresses representing the ensemble */ - public ArrayList toList(); + public ArrayList toList(); } protected static class TruePredicate implements Predicate { @@ -97,7 +96,7 @@ public class RackawareEnsemblePlacementP protected static class EnsembleForReplacement implements Ensemble { public static final EnsembleForReplacement instance = new EnsembleForReplacement(); - static final ArrayList EMPTY_LIST = new ArrayList(0); + static final ArrayList EMPTY_LIST = new ArrayList(0); @Override public void addBookie(BookieNode node) { @@ -105,7 +104,7 @@ public class RackawareEnsemblePlacementP } @Override - public ArrayList toList() { + public ArrayList toList() { return EMPTY_LIST; } @@ -181,8 +180,8 @@ public class RackawareEnsemblePlacementP } @Override - public ArrayList toList() { - ArrayList addresses = new ArrayList(ensembleSize); + public ArrayList toList() { + ArrayList addresses = new ArrayList(ensembleSize); for (BookieNode bn : chosenNodes) { addresses.add(bn.getAddr()); } @@ -198,20 +197,20 @@ public class RackawareEnsemblePlacementP protected static class BookieNode implements Node { - private final InetSocketAddress addr; // identifier of a bookie node. + private final BookieSocketAddress addr; // identifier of a bookie node. private int level; // the level in topology tree private Node parent; // its parent in topology tree private String location = NetworkTopology.DEFAULT_RACK; // its network location private final String name; - BookieNode(InetSocketAddress addr, String networkLoc) { + BookieNode(BookieSocketAddress addr, String networkLoc) { this.addr = addr; - this.name = StringUtils.addrToString(addr); + this.name = addr.toString(); setNetworkLocation(networkLoc); } - public InetSocketAddress getAddr() { + public BookieSocketAddress getAddr() { return addr; } @@ -292,18 +291,18 @@ public class RackawareEnsemblePlacementP // for now, we just maintain the writable bookies' topology private final NetworkTopology topology; private DNSToSwitchMapping dnsResolver; - private final Map knownBookies; + private final Map knownBookies; private BookieNode localNode; private final ReentrantReadWriteLock rwLock; public RackawareEnsemblePlacementPolicy() { topology = new NetworkTopology(); - knownBookies = new HashMap(); + knownBookies = new HashMap(); rwLock = new ReentrantReadWriteLock(); } - private BookieNode createBookieNode(InetSocketAddress addr) { + private BookieNode createBookieNode(BookieSocketAddress addr) { return new BookieNode(addr, resolveNetworkLocation(addr)); } @@ -322,7 +321,7 @@ public class RackawareEnsemblePlacementP BookieNode bn; try { - bn = createBookieNode(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0)); + bn = createBookieNode(new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0)); } catch (UnknownHostException e) { LOG.error("Failed to get local host address : ", e); bn = null; @@ -338,12 +337,12 @@ public class RackawareEnsemblePlacementP // do nothing } - private String resolveNetworkLocation(InetSocketAddress addr) { + private String resolveNetworkLocation(BookieSocketAddress addr) { List names = new ArrayList(1); if (dnsResolver instanceof CachedDNSToSwitchMapping) { - names.add(addr.getAddress().getHostAddress()); + names.add(addr.getSocketAddress().getAddress().getHostAddress()); } else { - names.add(addr.getHostName()); + names.add(addr.getSocketAddress().getHostName()); } // resolve network addresses List rNames = dnsResolver.resolve(names); @@ -359,12 +358,12 @@ public class RackawareEnsemblePlacementP } @Override - public Set onClusterChanged(Set writableBookies, - Set readOnlyBookies) { + public Set onClusterChanged(Set writableBookies, + Set readOnlyBookies) { rwLock.writeLock().lock(); try { - ImmutableSet joinedBookies, leftBookies, deadBookies; - Set oldBookieSet = knownBookies.keySet(); + ImmutableSet joinedBookies, leftBookies, deadBookies; + Set oldBookieSet = knownBookies.keySet(); // left bookies : bookies in known bookies, but not in new writable bookie cluster. leftBookies = Sets.difference(oldBookieSet, writableBookies).immutableCopy(); // joined bookies : bookies in new writable bookie cluster, but not in known bookies @@ -378,7 +377,7 @@ public class RackawareEnsemblePlacementP } // node left - for (InetSocketAddress addr : leftBookies) { + for (BookieSocketAddress addr : leftBookies) { BookieNode node = knownBookies.remove(addr); topology.remove(node); if (LOG.isDebugEnabled()) { @@ -387,7 +386,7 @@ public class RackawareEnsemblePlacementP } // node joined - for (InetSocketAddress addr : joinedBookies) { + for (BookieSocketAddress addr : joinedBookies) { BookieNode node = createBookieNode(addr); topology.add(node); knownBookies.put(addr, node); @@ -402,9 +401,9 @@ public class RackawareEnsemblePlacementP } } - private Set convertBookiesToNodes(Set excludeBookies) { + private Set convertBookiesToNodes(Set excludeBookies) { Set nodes = new HashSet(); - for (InetSocketAddress addr : excludeBookies) { + for (BookieSocketAddress addr : excludeBookies) { BookieNode bn = knownBookies.get(addr); if (null == bn) { bn = createBookieNode(addr); @@ -415,8 +414,8 @@ public class RackawareEnsemblePlacementP } @Override - public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, - Set excludeBookies) throws BKNotEnoughBookiesException { + public ArrayList newEnsemble(int ensembleSize, int writeQuorumSize, + Set excludeBookies) throws BKNotEnoughBookiesException { rwLock.readLock().lock(); try { Set excludeNodes = convertBookiesToNodes(excludeBookies); @@ -427,7 +426,7 @@ public class RackawareEnsemblePlacementP if (numRacks < 2) { List bns = selectRandom(ensembleSize, excludeNodes, EnsembleForReplacement.instance); - ArrayList addrs = new ArrayList(ensembleSize); + ArrayList addrs = new ArrayList(ensembleSize); for (BookieNode bn : bns) { addrs.add(bn.addr); } @@ -454,8 +453,8 @@ public class RackawareEnsemblePlacementP } @Override - public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace, - Set excludeBookies) throws BKNotEnoughBookiesException { + public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace, + Set excludeBookies) throws BKNotEnoughBookiesException { rwLock.readLock().lock(); try { BookieNode bn = knownBookies.get(bookieToReplace); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java Wed Feb 5 21:43:39 2014 @@ -20,15 +20,16 @@ */ package org.apache.bookkeeper.client; +import java.security.GeneralSecurityException; + import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.versioning.Version; -import java.security.GeneralSecurityException; -import java.net.InetSocketAddress; import java.util.concurrent.RejectedExecutionException; /** @@ -106,7 +107,7 @@ class ReadOnlyLedgerHandle extends Ledge } @Override - void handleBookieFailure(final InetSocketAddress addr, final int bookieIndex) { + void handleBookieFailure(final BookieSocketAddress addr, final int bookieIndex) { blockAddCompletions.incrementAndGet(); synchronized (metadata) { try { Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java?rev=1564946&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java Wed Feb 5 21:43:39 2014 @@ -0,0 +1,100 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.net; + +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +import static org.apache.bookkeeper.util.BookKeeperConstants.COLON; + +/** + * This is a data wrapper class that is an InetSocketAddress, it would use the hostname + * provided in constructors directly. + *

+ * The string representation of a BookieSocketAddress is : : + */ +public class BookieSocketAddress { + + // Member fields that make up this class. + private final String hostname; + private final int port; + + private final InetSocketAddress socketAddress; + + // Constructor that takes in both a port. + public BookieSocketAddress(String hostname, int port) { + this.hostname = hostname; + this.port = port; + socketAddress = new InetSocketAddress(hostname, port); + } + + // Constructor from a String "serialized" version of this class. + public BookieSocketAddress(String addr) throws UnknownHostException { + String[] parts = addr.split(COLON); + if (parts.length < 2) { + throw new UnknownHostException(addr); + } + this.hostname = parts[0]; + try { + this.port = Integer.parseInt(parts[1]); + } catch (NumberFormatException nfe) { + throw new UnknownHostException(addr); + } + socketAddress = new InetSocketAddress(hostname, port); + } + + // Public getters + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + // Method to return an InetSocketAddress for the regular port. + public InetSocketAddress getSocketAddress() { + return socketAddress; + } + + // Return the String "serialized" version of this object. + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(hostname).append(COLON).append(port); + return sb.toString(); + } + + // Implement an equals method comparing two HedwigSocketAddress objects. + @Override + public boolean equals(Object obj) { + if (!(obj instanceof BookieSocketAddress)) + return false; + BookieSocketAddress that = (BookieSocketAddress) obj; + return this.hostname.equals(that.hostname) && (this.port == that.port); + } + + @Override + public int hashCode() { + return this.hostname.hashCode() + 13 * this.port; + } + +} Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Wed Feb 5 21:43:39 2014 @@ -1,5 +1,3 @@ -package org.apache.bookkeeper.proto; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,11 +18,11 @@ package org.apache.bookkeeper.proto; * under the License. * */ +package org.apache.bookkeeper.proto; import static com.google.common.base.Charsets.UTF_8; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -36,6 +34,7 @@ import java.util.concurrent.locks.Reentr import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; @@ -47,11 +46,10 @@ import org.jboss.netty.buffer.ChannelBuf import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Implements the client-side part of the BookKeeper protocol. * @@ -64,8 +62,8 @@ public class BookieClient { final OrderedSafeExecutor executor; final ClientSocketChannelFactory channelFactory; - final ConcurrentHashMap channels = - new ConcurrentHashMap(); + final ConcurrentHashMap channels = + new ConcurrentHashMap(); final ScheduledExecutorService timeoutExecutor = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() .setNameFormat("BKClient-TimeoutTaskExecutor-%d").build()); @@ -88,7 +86,7 @@ public class BookieClient { this.statsLogger = statsLogger; } - public PerChannelBookieClient lookupClient(InetSocketAddress addr) { + public PerChannelBookieClient lookupClient(BookieSocketAddress addr) { PerChannelBookieClient channel = channels.get(addr); if (channel == null) { @@ -111,9 +109,9 @@ public class BookieClient { return channel; } - public void closeClients(Set addrs) { + public void closeClients(Set addrs) { final HashSet clients = new HashSet(); - for (InetSocketAddress a : addrs) { + for (BookieSocketAddress a : addrs) { PerChannelBookieClient c = channels.get(a); if (c != null) { clients.add(c); @@ -133,7 +131,8 @@ public class BookieClient { }); } - public void addEntry(final InetSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId, + public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey, + final long entryId, final ChannelBuffer toSend, final WriteCallback cb, final Object ctx, final int options) { final PerChannelBookieClient client = lookupClient(addr); if (client == null) { @@ -159,7 +158,7 @@ public class BookieClient { }); } - public void readEntryAndFenceLedger(final InetSocketAddress addr, + public void readEntryAndFenceLedger(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId, @@ -189,7 +188,7 @@ public class BookieClient { }); } - public void readEntry(final InetSocketAddress addr, final long ledgerId, final long entryId, + public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId, final ReadEntryCallback cb, final Object ctx) { final PerChannelBookieClient client = lookupClient(addr); if (client == null) { @@ -276,7 +275,7 @@ public class BookieClient { } WriteCallback cb = new WriteCallback() { - public void writeComplete(int rc, long ledger, long entry, InetSocketAddress addr, Object ctx) { + public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress addr, Object ctx) { Counter counter = (Counter) ctx; counter.dec(); if (rc != 0) { @@ -296,7 +295,7 @@ public class BookieClient { OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "BookieClientWorker"); BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor); - InetSocketAddress addr = new InetSocketAddress(args[0], Integer.parseInt(args[1])); + BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1])); for (int i = 0; i < 100000; i++) { counter.inc(); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java Wed Feb 5 21:43:39 2014 @@ -81,7 +81,7 @@ class BookieNettyServer { // listen on all interfaces bindAddress = new InetSocketAddress(conf.getBookiePort()); } else { - bindAddress = Bookie.getBookieAddress(conf); + bindAddress = Bookie.getBookieAddress(conf).getSocketAddress(); } listenOn(bindAddress); } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java Wed Feb 5 21:43:39 2014 @@ -20,7 +20,6 @@ */ package org.apache.bookkeeper.proto; -import java.nio.channels.ClosedChannelException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; import org.jboss.netty.channel.Channel; @@ -33,6 +32,8 @@ import org.jboss.netty.channel.group.Cha import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.channels.ClosedChannelException; + /** * Serverside handler for bookkeeper requests */ Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java Wed Feb 5 21:43:39 2014 @@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -34,6 +33,7 @@ import java.util.concurrent.TimeoutExcep import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.util.MathUtils; import org.jboss.netty.channel.Channel; @@ -202,7 +202,7 @@ public class BookieRequestProcessor impl @Override public void writeComplete(int rc, long ledgerId, long entryId, - InetSocketAddress addr, Object ctx) { + BookieSocketAddress addr, Object ctx) { assert (ctx instanceof AddCtx); AddCtx addctx = (AddCtx) ctx; addctx.c.write(ResponseBuilder.buildAddResponse(addctx.r)); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Wed Feb 5 21:43:39 2014 @@ -22,35 +22,32 @@ package org.apache.bookkeeper.proto; import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.UnknownHostException; -import org.apache.zookeeper.KeeperException; - import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieCriticalThread; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.ExitCode; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.jmx.BKMBeanRegistry; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.replication.AutoRecoveryMain; import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.Options; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; - +import org.apache.commons.configuration.ConfigurationException; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Implements the server-side part of the BookKeeper protocol. * @@ -112,7 +109,7 @@ public class BookieServer { } @VisibleForTesting - public InetSocketAddress getLocalAddress() throws UnknownHostException { + public BookieSocketAddress getLocalAddress() throws UnknownHostException { return Bookie.getBookieAddress(conf); } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServerBean.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServerBean.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServerBean.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServerBean.java Wed Feb 5 21:43:39 2014 @@ -20,11 +20,9 @@ package org.apache.bookkeeper.proto; import java.net.UnknownHostException; -import org.apache.bookkeeper.util.StringUtils; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.jmx.BKMBeanInfo; -import org.apache.bookkeeper.proto.BKStats; import org.apache.bookkeeper.proto.BKStats.OpStatData; /** @@ -75,7 +73,7 @@ public class BookieServerBean implements @Override public String getServerPort() { try { - return StringUtils.addrToString(Bookie.getBookieAddress(conf)); + return Bookie.getBookieAddress(conf).toString(); } catch (UnknownHostException e) { return "localhost:" + conf.getBookiePort(); } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java Wed Feb 5 21:43:39 2014 @@ -21,11 +21,11 @@ package org.apache.bookkeeper.proto; -import java.net.InetSocketAddress; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.zookeeper.AsyncCallback; import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; @@ -62,7 +62,7 @@ public class BookkeeperInternalCallbacks } public interface WriteCallback { - void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx); + void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx); } public interface GenericCallback { Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Wed Feb 5 21:43:39 2014 @@ -18,7 +18,6 @@ package org.apache.bookkeeper.proto; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; @@ -30,6 +29,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeperClientStats; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; @@ -73,8 +73,7 @@ public class PerChannelBookieClient exte static final long maxMemory = Runtime.getRuntime().maxMemory() / 5; public static final int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M - - InetSocketAddress addr; + BookieSocketAddress addr; AtomicLong totalBytesOutstanding; ClientSocketChannelFactory channelFactory; OrderedSafeExecutor executor; @@ -148,20 +147,20 @@ public class PerChannelBookieClient exte } public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, - InetSocketAddress addr, AtomicLong totalBytesOutstanding, + BookieSocketAddress addr, AtomicLong totalBytesOutstanding, ScheduledExecutorService timeoutExecutor) { this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, timeoutExecutor, NullStatsLogger.INSTANCE); } public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, - InetSocketAddress addr, AtomicLong totalBytesOutstanding) { + BookieSocketAddress addr, AtomicLong totalBytesOutstanding) { this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, null, NullStatsLogger.INSTANCE); } public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, - ClientSocketChannelFactory channelFactory, InetSocketAddress addr, + ClientSocketChannelFactory channelFactory, BookieSocketAddress addr, AtomicLong totalBytesOutstanding, ScheduledExecutorService timeoutExecutor, StatsLogger parentStatsLogger) { this.conf = conf; @@ -172,7 +171,7 @@ public class PerChannelBookieClient exte this.state = ConnectionState.DISCONNECTED; StringBuilder nameBuilder = new StringBuilder(); - nameBuilder.append(addr.getHostName().replace('.', '_').replace('-', '_')) + nameBuilder.append(addr.getHostname().replace('.', '_').replace('-', '_')) .append("_").append(addr.getPort()); this.statsLogger = parentStatsLogger.scope(BookKeeperClientStats.CHANNEL_SCOPE) @@ -201,7 +200,7 @@ public class PerChannelBookieClient exte bootstrap.setOption("tcpNoDelay", conf.getClientTcpNoDelay()); bootstrap.setOption("keepAlive", true); - ChannelFuture future = bootstrap.connect(addr); + ChannelFuture future = bootstrap.connect(addr.getSocketAddress()); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -302,12 +301,19 @@ public class PerChannelBookieClient exte * {@link #connectIfNeededAndDoOp(GenericCallback)} * * @param ledgerId + * Ledger Id * @param masterKey + * Master Key * @param entryId + * Entry Id * @param toSend + * Buffer to send * @param cb + * Write callback * @param ctx + * Write callback context * @param options + * Add options */ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb, Object ctx, final int options) { @@ -799,7 +805,7 @@ public class PerChannelBookieClient exte final long requestTimeMillis = MathUtils.now(); this.cb = null == addEntryOpLogger ? originalCallback : new WriteCallback() { @Override - public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) { + public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { long latencyMillis = MathUtils.now() - requestTimeMillis; if (rc != BKException.Code.OK) { addEntryOpLogger.registerFailedEvent(latencyMillis); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java Wed Feb 5 21:43:39 2014 @@ -20,55 +20,47 @@ */ package org.apache.bookkeeper.replication; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.Set; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; - -import java.net.InetSocketAddress; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.client.BookKeeperAdmin; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; +import com.google.common.collect.Sets; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.BookKeeperAdmin; +import org.apache.bookkeeper.client.BookiesListener; import org.apache.bookkeeper.client.LedgerChecker; import org.apache.bookkeeper.client.LedgerFragment; -import org.apache.bookkeeper.client.BookiesListener; -import org.apache.bookkeeper.util.StringUtils; - -import org.apache.bookkeeper.util.ZkUtils; -import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; - +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; - import org.apache.bookkeeper.replication.ReplicationException.BKAuditException; import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.commons.collections.CollectionUtils; -import com.google.common.collect.Sets; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.AsyncCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + /** * Auditor is a single entity in the entire Bookie cluster and will be watching * all the bookies under 'ledgerrootpath/available' zkpath. When any of the @@ -115,7 +107,6 @@ public class Auditor implements BookiesL this.bkc = new BookKeeper(new ClientConfiguration(conf), zkc); this.admin = new BookKeeperAdmin(bkc); - } catch (CompatibilityException ce) { throw new UnavailableException( "CompatibilityException while initializing Auditor", ce); @@ -272,13 +263,13 @@ public class Auditor implements BookiesL // Watching on only available bookies is sufficient, as changes in readonly bookies also changes in available // bookies admin.notifyBookiesChanged(this); - Collection availableBkAddresses = admin.getAvailableBookies(); - Collection readOnlyBkAddresses = admin.getReadOnlyBookies(); + Collection availableBkAddresses = admin.getAvailableBookies(); + Collection readOnlyBkAddresses = admin.getReadOnlyBookies(); availableBkAddresses.addAll(readOnlyBkAddresses); List availableBookies = new ArrayList(); - for (InetSocketAddress addr : availableBkAddresses) { - availableBookies.add(StringUtils.addrToString(addr)); + for (BookieSocketAddress addr : availableBkAddresses) { + availableBookies.add(addr.toString()); } return availableBookies; } @@ -352,13 +343,12 @@ public class Auditor implements BookiesL public void operationComplete(int rc, Set fragments) { try { if (rc == BKException.Code.OK) { - Set bookies = Sets.newHashSet(); + Set bookies = Sets.newHashSet(); for (LedgerFragment f : fragments) { bookies.add(f.getAddress()); } - for (InetSocketAddress bookie : bookies) { - publishSuspectedLedgers(StringUtils.addrToString(bookie), - Sets.newHashSet(lh.getId())); + for (BookieSocketAddress bookie : bookies) { + publishSuspectedLedgers(bookie.toString(), Sets.newHashSet(lh.getId())); } } lh.close(); @@ -508,7 +498,7 @@ public class Auditor implements BookiesL /** * Return true if auditor is running otherwise return false - * + * * @return auditor status */ public boolean isRunning() { Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java Wed Feb 5 21:43:39 2014 @@ -31,7 +31,6 @@ import org.apache.bookkeeper.conf.Server import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.util.ZkUtils; -import org.apache.bookkeeper.util.StringUtils; import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; @@ -80,8 +79,7 @@ public class AutoRecoveryMain { } }; zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w); - auditorElector = new AuditorElector( - StringUtils.addrToString(Bookie.getBookieAddress(conf)), conf, zk); + auditorElector = new AuditorElector(Bookie.getBookieAddress(conf).toString(), conf, zk); replicationWorker = new ReplicationWorker(zk, conf, Bookie.getBookieAddress(conf)); deathWatcher = new AutoRecoveryDeathWatcher(this); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java Wed Feb 5 21:43:39 2014 @@ -17,7 +17,6 @@ */ package org.apache.bookkeeper.replication; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -30,10 +29,10 @@ import java.util.concurrent.CountDownLat import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.replication.ReplicationException.BKAuditException; -import org.apache.bookkeeper.util.StringUtils; import org.apache.zookeeper.AsyncCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,12 +74,12 @@ public class BookieLedgerIndexer { public void operationComplete(final int rc, LedgerMetadata ledgerMetadata) { if (rc == BKException.Code.OK) { - for (Map.Entry> ensemble : ledgerMetadata + for (Map.Entry> ensemble : ledgerMetadata .getEnsembles().entrySet()) { - for (InetSocketAddress bookie : ensemble + for (BookieSocketAddress bookie : ensemble .getValue()) { putLedger(bookie2ledgersMap, - StringUtils.addrToString(bookie), + bookie.toString(), ledgerId); } } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Wed Feb 5 21:43:39 2014 @@ -20,7 +20,6 @@ package org.apache.bookkeeper.replication; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.List; import java.util.Set; import java.util.Timer; @@ -29,18 +28,19 @@ import java.util.concurrent.CountDownLat import org.apache.bookkeeper.bookie.BookieThread; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException; +import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException; +import org.apache.bookkeeper.client.BKException.BKReadException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.client.LedgerChecker; import org.apache.bookkeeper.client.LedgerFragment; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException; -import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException; -import org.apache.bookkeeper.client.BKException.BKReadException; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; @@ -62,7 +62,7 @@ public class ReplicationWorker implement private volatile boolean workerRunning = false; final private BookKeeperAdmin admin; private LedgerChecker ledgerChecker; - private InetSocketAddress targetBookie; + private BookieSocketAddress targetBookie; private BookKeeper bkc; private Thread workerThread; private long openLedgerRereplicationGracePeriod; @@ -82,7 +82,7 @@ public class ReplicationWorker implement * local Bookie address. */ public ReplicationWorker(final ZooKeeper zkc, - final ServerConfiguration conf, InetSocketAddress targetBKAddr) + final ServerConfiguration conf, BookieSocketAddress targetBKAddr) throws CompatibilityException, KeeperException, InterruptedException, IOException { this.zkc = zkc; @@ -328,8 +328,8 @@ public class ReplicationWorker implement private boolean isTargetBookieExistsInFragmentEnsemble(LedgerHandle lh, LedgerFragment ledgerFragment) { - List ensemble = ledgerFragment.getEnsemble(); - for (InetSocketAddress bkAddr : ensemble) { + List ensemble = ledgerFragment.getEnsemble(); + for (BookieSocketAddress bkAddr : ensemble) { if (targetBookie.equals(bkAddr)) { return true; } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java Wed Feb 5 21:43:39 2014 @@ -21,22 +21,18 @@ package org.apache.bookkeeper.tools; * */ -import java.io.IOException; -import org.apache.zookeeper.KeeperException; -import java.net.InetSocketAddress; - -import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeperAdmin; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; /** * Provides Admin Tools to manage the BookKeeper cluster. * */ public class BookKeeperTools { - private final static Logger LOG = LoggerFactory.getLogger(BookKeeperTools.class); /** * Main method so we can invoke the bookie recovery via command line. @@ -53,7 +49,7 @@ public class BookKeeperTools { * @throws KeeperException * @throws BKException */ - public static void main(String[] args) + public static void main(String[] args) throws InterruptedException, IOException, KeeperException, BKException { // Validate the inputs if (args.length < 2) { @@ -67,9 +63,9 @@ public class BookKeeperTools { System.err.println("BookieSrc inputted has invalid name format (host:port expected): " + args[1]); return; } - final InetSocketAddress bookieSrc = new InetSocketAddress(bookieSrcString[0], Integer + final BookieSocketAddress bookieSrc = new BookieSocketAddress(bookieSrcString[0], Integer .parseInt(bookieSrcString[1])); - InetSocketAddress bookieDest = null; + BookieSocketAddress bookieDest = null; if (args.length < 3) { String bookieDestString[] = args[2].split(":"); if (bookieDestString.length < 2) { @@ -77,7 +73,7 @@ public class BookKeeperTools { + args[2]); return; } - bookieDest = new InetSocketAddress(bookieDestString[0], Integer.parseInt(bookieDestString[1])); + bookieDest = new BookieSocketAddress(bookieDestString[0], Integer.parseInt(bookieDestString[1])); } // Create the BookKeeperTools instance and perform the bookie recovery Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java Wed Feb 5 21:43:39 2014 @@ -19,7 +19,6 @@ package org.apache.bookkeeper.util; */ import java.io.IOException; -import java.net.InetSocketAddress; /** * Provided utilites for parsing network addresses, ledger-id from node paths @@ -32,34 +31,6 @@ public class StringUtils { static public final String LEDGER_NODE_PREFIX = "L"; /** - * Parses address into IP and port. - * - * @param addr - * String - */ - - public static InetSocketAddress parseAddr(String s) throws IOException { - - String parts[] = s.split(":"); - if (parts.length != 2) { - throw new IOException(s + " does not have the form host:port"); - } - int port; - try { - port = Integer.parseInt(parts[1]); - } catch (NumberFormatException e) { - throw new IOException(s + " does not have the form host:port"); - } - - InetSocketAddress addr = new InetSocketAddress(parts[0], port); - return addr; - } - - public static String addrToString(InetSocketAddress addr) { - return addr.getAddress().getHostAddress() + ":" + addr.getPort(); - } - - /** * Formats ledger ID according to ZooKeeper rules * * @param id Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Wed Feb 5 21:43:39 2014 @@ -1,5 +1,3 @@ -package org.apache.bookkeeper.client; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,36 +18,37 @@ package org.apache.bookkeeper.client; * under the License. * */ +package org.apache.bookkeeper.client; + +import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.MSLedgerManagerFactory; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase; +import org.jboss.netty.buffer.ChannelBuffer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.HashSet; -import java.util.HashMap; -import java.util.Collections; import java.util.Random; - -import org.jboss.netty.buffer.ChannelBuffer; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase; -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.meta.MSLedgerManagerFactory; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback; -import org.apache.bookkeeper.client.BookKeeper.DigestType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import java.util.concurrent.atomic.AtomicLong; /** * This class tests the bookie recovery admin functionality. @@ -244,7 +243,7 @@ public class BookieRecoveryTest extends for (int i = 0; i < numEntries; i++) { lh.addEntry(data); } - InetSocketAddress bookieToKill = lh.getLedgerMetadata().getEnsemble(numEntries - 1).get(1); + BookieSocketAddress bookieToKill = lh.getLedgerMetadata().getEnsemble(numEntries - 1).get(1); killBookie(bookieToKill); startNewBookie(); for (int i = 0; i < numEntries; i++) { @@ -299,8 +298,10 @@ public class BookieRecoveryTest extends writeEntriestoLedgers(numMsgs, 10, lhs); // Call the async recover bookie method. - InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort); - InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort); + BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), + initialPort); + BookieSocketAddress bookieDest = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), + newBookiePort); LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one (" + bookieDest + ")"); // Initiate the sync object @@ -355,8 +356,9 @@ public class BookieRecoveryTest extends writeEntriestoLedgers(numMsgs, 10, lhs); // Call the async recover bookie method. - InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort); - InetSocketAddress bookieDest = null; + BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), + initialPort); + BookieSocketAddress bookieDest = null; LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to a random available one"); // Initiate the sync object @@ -408,8 +410,10 @@ public class BookieRecoveryTest extends writeEntriestoLedgers(numMsgs, 10, lhs); // Call the sync recover bookie method. - InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort); - InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort); + BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), + initialPort); + BookieSocketAddress bookieDest = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), + newBookiePort); LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one (" + bookieDest + ")"); bkAdmin.recoverBookieData(bookieSrc, bookieDest); @@ -454,8 +458,9 @@ public class BookieRecoveryTest extends writeEntriestoLedgers(numMsgs, 10, lhs); // Call the sync recover bookie method. - InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort); - InetSocketAddress bookieDest = null; + BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), + initialPort); + BookieSocketAddress bookieDest = null; LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to a random available one"); bkAdmin.recoverBookieData(bookieSrc, bookieDest); @@ -476,7 +481,6 @@ public class BookieRecoveryTest extends @Override public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) { if (LOG.isDebugEnabled()) { - InetSocketAddress addr = (InetSocketAddress)ctx; LOG.debug("Got " + rc + " for ledger " + ledgerId + " entry " + entryId + " from " + ctx); } if (rc == BKException.Code.OK) { @@ -498,7 +502,7 @@ public class BookieRecoveryTest extends private boolean verifyFullyReplicated(LedgerHandle lh, long untilEntry) throws Exception { LedgerMetadata md = getLedgerMetadata(lh); - Map> ensembles = md.getEnsembles(); + Map> ensembles = md.getEnsembles(); HashMap ranges = new HashMap(); ArrayList keyList = Collections.list( @@ -509,7 +513,7 @@ public class BookieRecoveryTest extends } ranges.put(keyList.get(keyList.size()-1), untilEntry); - for (Map.Entry> e : ensembles.entrySet()) { + for (Map.Entry> e : ensembles.entrySet()) { int quorum = md.getAckQuorumSize(); long startEntryId = e.getKey(); long endEntryId = ranges.get(startEntryId); @@ -518,7 +522,7 @@ public class BookieRecoveryTest extends ReplicationVerificationCallback cb = new ReplicationVerificationCallback(numRequests); for (long i = startEntryId; i < endEntryId; i++) { - for (InetSocketAddress addr : e.getValue()) { + for (BookieSocketAddress addr : e.getValue()) { bkc.bookieClient.readEntry(addr, lh.getId(), i, cb, addr); } } @@ -577,11 +581,11 @@ public class BookieRecoveryTest extends long numDupes = 0; for (LedgerHandle lh : lhs) { LedgerMetadata md = getLedgerMetadata(lh); - for (Map.Entry> e : md.getEnsembles().entrySet()) { - HashSet set = new HashSet(); + for (Map.Entry> e : md.getEnsembles().entrySet()) { + HashSet set = new HashSet(); long fragment = e.getKey(); - for (InetSocketAddress addr : e.getValue()) { + for (BookieSocketAddress addr : e.getValue()) { if (set.contains(addr)) { LOG.error("Dupe " + addr + " found in ensemble for fragment " + fragment + " of ledger " + lh.getId()); @@ -610,15 +614,15 @@ public class BookieRecoveryTest extends closeLedgers(lhs); // Shutdown last bookie server in last ensemble - ArrayList lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles() + ArrayList lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles() .entrySet().iterator().next().getValue(); - InetSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1); + BookieSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1); killBookie(bookieToKill); // start a new bookie startNewBookie(); - InetSocketAddress bookieDest = null; + BookieSocketAddress bookieDest = null; LOG.info("Now recover the data on the killed bookie (" + bookieToKill + ") and replicate it to a random available one"); @@ -640,15 +644,15 @@ public class BookieRecoveryTest extends writeEntriestoLedgers(numMsgs, 0, lhs); // Shutdown the first bookie server - ArrayList lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles() + ArrayList lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles() .entrySet().iterator().next().getValue(); - InetSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1); + BookieSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1); killBookie(bookieToKill); // start a new bookie startNewBookie(); - InetSocketAddress bookieDest = null; + BookieSocketAddress bookieDest = null; LOG.info("Now recover the data on the killed bookie (" + bookieToKill + ") and replicate it to a random available one"); @@ -677,13 +681,13 @@ public class BookieRecoveryTest extends writeEntriestoLedgers(numMsgs, 0, lhs); // Shutdown the first bookie server - ArrayList lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles() + ArrayList lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles() .entrySet().iterator().next().getValue(); // removed bookie - InetSocketAddress bookieToKill = lastEnsemble.get(0); + BookieSocketAddress bookieToKill = lastEnsemble.get(0); killBookie(bookieToKill); // temp failure - InetSocketAddress bookieToKill2 = lastEnsemble.get(1); + BookieSocketAddress bookieToKill2 = lastEnsemble.get(1); ServerConfiguration conf2 = killBookie(bookieToKill2); // start a new bookie @@ -720,7 +724,7 @@ public class BookieRecoveryTest extends List newLhs = openLedgers(lhs); for (LedgerHandle newLh : newLhs) { // first ensemble should contains bookieToKill2 and not contain bookieToKill - Map.Entry> entry = + Map.Entry> entry = newLh.getLedgerMetadata().getEnsembles().entrySet().iterator().next(); assertFalse(entry.getValue().contains(bookieToKill)); assertTrue(entry.getValue().contains(bookieToKill2)); @@ -745,8 +749,8 @@ public class BookieRecoveryTest extends bs.remove(0); // Call the async recover bookie method. - InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort); - InetSocketAddress bookieDest = null; + BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), + initialPort); LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to a random available one"); // Initiate the sync object @@ -774,7 +778,7 @@ public class BookieRecoveryTest extends // Shutdown the first bookie server LOG.info("Finished writing all ledger entries so shutdown one of the bookies."); int removeIndex = r.nextInt(bs.size()); - InetSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress(); + BookieSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress(); bs.get(removeIndex).shutdown(); bs.remove(removeIndex); @@ -808,7 +812,6 @@ public class BookieRecoveryTest extends byte[] passwdCorrect = "AAAAAA".getBytes(); byte[] passwdBad = "BBBBBB".getBytes(); DigestType digestCorrect = digestType; - DigestType digestBad = (digestType == DigestType.MAC) ? DigestType.CRC32 : DigestType.MAC; LedgerHandle lh = bkc.createLedger(3, 2, digestCorrect, passwdCorrect); long ledgerId = lh.getId(); @@ -817,7 +820,7 @@ public class BookieRecoveryTest extends } lh.close(); - InetSocketAddress bookieSrc = bs.get(0).getLocalAddress(); + BookieSocketAddress bookieSrc = bs.get(0).getLocalAddress(); bs.get(0).shutdown(); bs.remove(0); startNewBookie(); @@ -919,9 +922,9 @@ public class BookieRecoveryTest extends bkc41.close(); // Startup a new bookie server - int newBookiePort = startNewBookie(); + startNewBookie(); int removeIndex = 0; - InetSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress(); + BookieSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress(); bs.get(removeIndex).shutdown(); bs.remove(removeIndex); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java Wed Feb 5 21:43:39 2014 @@ -20,7 +20,6 @@ */ package org.apache.bookkeeper.client; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Enumeration; @@ -28,6 +27,7 @@ import java.util.Random; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase; import org.junit.Before; import org.junit.Test; @@ -107,7 +107,7 @@ public class BookieWriteLedgerTest exten startNewBookie(); // Shutdown three bookies in the last ensemble and continue writing - ArrayList ensemble = lh.getLedgerMetadata() + ArrayList ensemble = lh.getLedgerMetadata() .getEnsembles().entrySet().iterator().next().getValue(); killBookie(ensemble.get(0)); killBookie(ensemble.get(1)); @@ -156,7 +156,7 @@ public class BookieWriteLedgerTest exten startNewBookie(); // Shutdown three bookies in the last ensemble and continue writing - ArrayList ensemble = lh.getLedgerMetadata() + ArrayList ensemble = lh.getLedgerMetadata() .getEnsembles().entrySet().iterator().next().getValue(); killBookie(ensemble.get(0)); killBookie(ensemble.get(1)); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java Wed Feb 5 21:43:39 2014 @@ -18,7 +18,6 @@ package org.apache.bookkeeper.client; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -31,6 +30,7 @@ import org.apache.bookkeeper.client.Asyn import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; @@ -133,7 +133,7 @@ public class LedgerCloseTest extends Boo final CountDownLatch recoverDoneLatch = new CountDownLatch(1); final CountDownLatch failedLatch = new CountDownLatch(1); // kill first bookie to replace with a unauthorize bookie - InetSocketAddress bookie = lh.getLedgerMetadata().currentEnsemble.get(0); + BookieSocketAddress bookie = lh.getLedgerMetadata().currentEnsemble.get(0); ServerConfiguration conf = killBookie(bookie); // replace a unauthorize bookie startUnauthorizedBookie(conf, addDoneLatch); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java Wed Feb 5 21:43:39 2014 @@ -22,7 +22,6 @@ package org.apache.bookkeeper.client; */ import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -33,6 +32,7 @@ import org.apache.bookkeeper.bookie.Book import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.test.BaseTestCase; import org.junit.Test; @@ -179,7 +179,7 @@ public class LedgerRecoveryTest extends // kill first bookie server to start a fake one to simulate a slow bookie // and failed to add entry on crash // until write succeed - InetSocketAddress host = beforelh.getLedgerMetadata().currentEnsemble.get(slowBookieIdx); + BookieSocketAddress host = beforelh.getLedgerMetadata().currentEnsemble.get(slowBookieIdx); ServerConfiguration conf = killBookie(host); Bookie fakeBookie = new Bookie(conf) { @@ -253,7 +253,7 @@ public class LedgerRecoveryTest extends bs.add(startBookie(conf, deadBookie1)); // kill first bookie server - InetSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0); + BookieSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0); ServerConfiguration conf1 = killBookie(bookie1); // Try to recover and fence the ledger after killing one bookie in the @@ -268,7 +268,7 @@ public class LedgerRecoveryTest extends // restart the first server, kill the second bsConfs.add(conf1); bs.add(startBookie(conf1)); - InetSocketAddress bookie2 = lhbefore.getLedgerMetadata().currentEnsemble.get(1); + BookieSocketAddress bookie2 = lhbefore.getLedgerMetadata().currentEnsemble.get(1); ServerConfiguration conf2 = killBookie(bookie2); // using async, because this could trigger an assertion @@ -334,8 +334,8 @@ public class LedgerRecoveryTest extends bs.add(startBookie(conf, deadBookie1)); // kill first bookie server - InetSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0); - ServerConfiguration conf1 = killBookie(bookie1); + BookieSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0); + killBookie(bookie1); // Try to recover and fence the ledger after killing one bookie in the // ensemble in the ensemble, and another bookie is available in zk but not writtable @@ -386,9 +386,9 @@ public class LedgerRecoveryTest extends fail("Failed to add " + numEntries + " to ledger handle " + lh.getId()); } // kill first 2 bookies to replace bookies - InetSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0); + BookieSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0); ServerConfiguration conf1 = killBookie(bookie1); - InetSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(1); + BookieSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(1); ServerConfiguration conf2 = killBookie(bookie2); // replace these two bookies Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java?rev=1564946&r1=1564945&r2=1564946&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java Wed Feb 5 21:43:39 2014 @@ -21,22 +21,20 @@ package org.apache.bookkeeper.client; -import java.util.Set; import java.util.List; - +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; -import java.net.InetSocketAddress; -import org.junit.Test; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.bookkeeper.conf.ClientConfiguration; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings("deprecation") public class SlowBookieTest extends BookKeeperClusterTestCase { @@ -64,7 +62,7 @@ public class SlowBookieTest extends Book final CountDownLatch b0latch = new CountDownLatch(1); final CountDownLatch b1latch = new CountDownLatch(1); final CountDownLatch addEntrylatch = new CountDownLatch(1); - List curEns = lh.getLedgerMetadata().currentEnsemble; + List curEns = lh.getLedgerMetadata().currentEnsemble; try { sleepBookie(curEns.get(0), b0latch); for (int i = 0; i < 10; i++) {