Author: fpj
Date: Wed Feb 5 21:43:39 2014
New Revision: 1564946
URL: http://svn.apache.org/r1564946
Log:
BOOKKEEPER-644: Provide a bookie address wrapper (sijie via fpj)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServerBean.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConditionalSetTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Feb 5 21:43:39 2014
@@ -56,6 +56,8 @@ Trunk (unreleased changes)
BOOKKEEPER-719: Inconsistent synchronization of org.apache.bookkeeper.stats.CodahaleMetricsProvider.metrics (sijie via ivank)
+ BOOKKEEPER-644: Provide a bookie address wrapper (sijie via fpj)
+
bookkeeper-server:
BOOKKEEPER-567: ReadOnlyBookieTest hangs on shutdown (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java Wed Feb 5 21:43:39 2014
@@ -19,13 +19,10 @@
*/
package org.apache.bookkeeper.benchmark;
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
import java.io.IOException;
+import java.util.concurrent.Executors;
-import org.apache.zookeeper.KeeperException;
-
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -34,11 +31,6 @@ import org.apache.bookkeeper.client.Book
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -46,6 +38,11 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.cli.ParseException;
+import org.apache.zookeeper.KeeperException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +54,7 @@ public class BenchBookie {
boolean complete;
@Override
synchronized public void writeComplete(int rc, long ledgerId, long entryId,
- InetSocketAddress addr, Object ctx) {
+ BookieSocketAddress addr, Object ctx) {
if (rc != 0) {
LOG.error("Got error " + rc);
}
@@ -78,7 +75,7 @@ public class BenchBookie {
int count;
int waitingCount = Integer.MAX_VALUE;
synchronized public void writeComplete(int rc, long ledgerId, long entryId,
- InetSocketAddress addr, Object ctx) {
+ BookieSocketAddress addr, Object ctx) {
if (rc != 0) {
LOG.error("Got error " + rc);
}
@@ -162,7 +159,7 @@ public class BenchBookie {
toSend.writeLong(ledger);
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
- bc.addEntry(new InetSocketAddress(addr, port), ledger, new byte[20],
+ bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
}
LOG.info("Waiting for warmup");
@@ -180,7 +177,7 @@ public class BenchBookie {
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
lc.resetComplete();
- bc.addEntry(new InetSocketAddress(addr, port), ledger, new byte[20],
+ bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
lc.waitForComplete();
}
@@ -200,7 +197,7 @@ public class BenchBookie {
toSend.writeLong(ledger);
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
- bc.addEntry(new InetSocketAddress(addr, port), ledger, new byte[20],
+ bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
}
tc.waitFor(entryCount);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java Wed Feb 5 21:43:39 2014
@@ -24,28 +24,26 @@ import org.junit.AfterClass;
import org.junit.Test;
import org.junit.Assert;
-import java.net.InetSocketAddress;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
public class TestBenchmark extends BookKeeperClusterTestCase {
protected static final Logger LOG = LoggerFactory.getLogger(TestBenchmark.class);
@@ -68,9 +66,9 @@ public class TestBenchmark extends BookK
@Test(timeout=60000)
public void testBookie() throws Exception {
- InetSocketAddress bookie = getBookie(0);
+ BookieSocketAddress bookie = getBookie(0);
BenchBookie.main(new String[] {
- "--host", bookie.getHostName(),
+ "--host", bookie.getSocketAddress().getHostName(),
"--port", String.valueOf(bookie.getPort()),
"--zookeeper", zkUtil.getZooKeeperConnectString()
});
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Wed Feb 5 21:43:39 2014
@@ -48,11 +48,11 @@ import org.apache.bookkeeper.jmx.BKMBean
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.util.net.DNS;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
@@ -158,7 +158,7 @@ public class Bookie extends BookieCritic
static class NopWriteCallback implements WriteCallback {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
- InetSocketAddress addr, Object ctx) {
+ BookieSocketAddress addr, Object ctx) {
if (LOG.isDebugEnabled()) {
LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
new Object[] { entryId, ledgerId, addr, rc });
@@ -225,7 +225,7 @@ public class Bookie extends BookieCritic
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
- InetSocketAddress addr, Object ctx) {
+ BookieSocketAddress addr, Object ctx) {
if (LOG.isDebugEnabled()) {
LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
new Object[] { entryId, ledgerId, addr, rc });
@@ -352,16 +352,16 @@ public class Bookie extends BookieCritic
/**
* Return the configured address of the bookie.
*/
- public static InetSocketAddress getBookieAddress(ServerConfiguration conf)
+ public static BookieSocketAddress getBookieAddress(ServerConfiguration conf)
throws UnknownHostException {
String iface = conf.getListeningInterface();
if (iface == null) {
iface = "default";
}
- InetSocketAddress addr = new InetSocketAddress(
- DNS.getDefaultHost(iface),
- conf.getBookiePort());
- if (addr.getAddress().isLoopbackAddress()
+ InetSocketAddress inetAddr = new InetSocketAddress(DNS.getDefaultHost(iface), conf.getBookiePort());
+ BookieSocketAddress addr =
+ new BookieSocketAddress(inetAddr.getAddress().getHostAddress(), conf.getBookiePort());
+ if (addr.getSocketAddress().getAddress().isLoopbackAddress()
&& !conf.getAllowLoopback()) {
throw new UnknownHostException("Trying to listen on loopback address, "
+ addr + " but this is forbidden by default "
@@ -452,7 +452,7 @@ public class Bookie extends BookieCritic
}
private String getMyId() throws UnknownHostException {
- return StringUtils.addrToString(Bookie.getBookieAddress(conf));
+ return Bookie.getBookieAddress(conf).toString();
}
void readJournal() throws IOException, BookieException {
@@ -1108,7 +1108,7 @@ public class Bookie extends BookieCritic
int count;
@Override
- synchronized public void writeComplete(int rc, long l, long e, InetSocketAddress addr, Object ctx) {
+ synchronized public void writeComplete(int rc, long l, long e, BookieSocketAddress addr, Object ctx) {
count--;
if (count == 0) {
notifyAll();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Wed Feb 5 21:43:39 2014
@@ -18,46 +18,42 @@
package org.apache.bookkeeper.bookie;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Formatter;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
-import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
-
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.Tool;
import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.util.StringUtils;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.MissingArgumentException;
-import org.apache.commons.cli.Options;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingArgumentException;
+import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Bookie Shell is to provide utilities for users to administer a bookkeeper cluster.
*/
@@ -262,9 +258,9 @@ public class BookieShell implements Tool
+ "(host:port expected): " + args[0]);
return -1;
}
- final InetSocketAddress bookieSrc = new InetSocketAddress(
+ final BookieSocketAddress bookieSrc = new BookieSocketAddress(
bookieSrcString[0], Integer.parseInt(bookieSrcString[1]));
- InetSocketAddress bookieDest = null;
+ BookieSocketAddress bookieDest = null;
if (args.length >= 2) {
final String bookieDestString[] = args[1].split(":");
if (bookieDestString.length < 2) {
@@ -272,7 +268,7 @@ public class BookieShell implements Tool
+ "(host:port expected): " + args[1]);
return -1;
}
- bookieDest = new InetSocketAddress(bookieDestString[0],
+ bookieDest = new BookieSocketAddress(bookieDestString[0],
Integer.parseInt(bookieDestString[1]));
}
@@ -505,20 +501,20 @@ public class BookieShell implements Tool
BookKeeperAdmin bka = new BookKeeperAdmin(clientconf);
int count = 0;
- Collection<InetSocketAddress> bookies = new ArrayList<InetSocketAddress>();
+ Collection<BookieSocketAddress> bookies = new ArrayList<BookieSocketAddress>();
if (cmdLine.hasOption("rw")) {
- Collection<InetSocketAddress> availableBookies = bka
+ Collection<BookieSocketAddress> availableBookies = bka
.getAvailableBookies();
bookies.addAll(availableBookies);
} else if (cmdLine.hasOption("ro")) {
- Collection<InetSocketAddress> roBookies = bka
+ Collection<BookieSocketAddress> roBookies = bka
.getReadOnlyBookies();
bookies.addAll(roBookies);
}
- for (InetSocketAddress b : bookies) {
- System.out.print(StringUtils.addrToString(b));
+ for (BookieSocketAddress b : bookies) {
+ System.out.print(b);
if (cmdLine.hasOption("h")) {
- System.out.print("\t" + b.getHostName());
+ System.out.print("\t" + b.getSocketAddress().getHostName());
}
System.out.println("");
count++;
@@ -1003,6 +999,7 @@ public class BookieShell implements Tool
for (byte b : data) {
formatter.format("%02x", b);
}
+ formatter.close();
return sb.toString();
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java Wed Feb 5 21:43:39 2014
@@ -20,34 +20,31 @@
*/
package org.apache.bookkeeper.bookie;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
-import java.io.BufferedWriter;
-import java.io.IOException;
import java.io.StringReader;
-
import java.net.UnknownHostException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-
-import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.DataFormats.CookieFormat;
-
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Charsets.UTF_8;
import com.google.protobuf.TextFormat;
/**
@@ -209,7 +206,7 @@ class Cookie {
throws UnknownHostException {
Cookie c = new Cookie();
c.layoutVersion = CURRENT_COOKIE_LAYOUT_VERSION;
- c.bookieHost = StringUtils.addrToString(Bookie.getBookieAddress(conf));
+ c.bookieHost = Bookie.getBookieAddress(conf).toString();
c.journalDir = conf.getJournalDirName();
StringBuilder b = new StringBuilder();
String[] dirs = conf.getLedgerDirNames();
@@ -257,6 +254,6 @@ class Cookie {
throws UnknownHostException {
String bookieCookiePath = conf.getZkLedgersRootPath() + "/"
+ BookKeeperConstants.COOKIE_NODE;
- return bookieCookiePath + "/" + StringUtils.addrToString(Bookie.getBookieAddress(conf));
+ return bookieCookiePath + "/" + Bookie.getBookieAddress(conf);
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Wed Feb 5 21:43:39 2014
@@ -20,27 +20,13 @@
*/
package org.apache.bookkeeper.client;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Random;
-import java.util.UUID;
-import java.util.Collection;
-
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
import org.apache.bookkeeper.client.BookKeeper.SyncOpenCallback;
import org.apache.bookkeeper.client.LedgerFragmentReplicator.SingleFragmentCallback;
import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -50,13 +36,26 @@ import org.apache.bookkeeper.zookeeper.Z
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.UUID;
+
import static com.google.common.base.Charsets.UTF_8;
/**
@@ -71,7 +70,7 @@ public class BookKeeperAdmin {
// BookKeeper client instance
private BookKeeper bkc;
-
+
// LedgerFragmentReplicator instance
private LedgerFragmentReplicator lfr;
@@ -136,7 +135,7 @@ public class BookKeeperAdmin {
/**
* Constructor that takes in a BookKeeper instance . This will be useful,
* when users already has bk instance ready.
- *
+ *
* @param bkc
* - bookkeeper instance
*/
@@ -164,7 +163,7 @@ public class BookKeeperAdmin {
*
* @return a collection of bookie addresses
*/
- public Collection<InetSocketAddress> getAvailableBookies()
+ public Collection<BookieSocketAddress> getAvailableBookies()
throws BKException {
return bkc.bookieWatcher.getBookies();
}
@@ -174,7 +173,7 @@ public class BookKeeperAdmin {
*
* @return a collection of bookie addresses
*/
- public Collection<InetSocketAddress> getReadOnlyBookies() {
+ public Collection<BookieSocketAddress> getReadOnlyBookies() {
return bkc.bookieWatcher.getReadOnlyBookies();
}
@@ -206,12 +205,12 @@ public class BookKeeperAdmin {
public void asyncOpenLedger(final long lId, final OpenCallback cb, final Object ctx) {
new LedgerOpenOp(bkc, lId, cb, ctx).initiate();
}
-
+
/**
* Open a ledger as an administrator. This means that no digest password
* checks are done. Otherwise, the call is identical to
* BookKeeper#openLedger
- *
+ *
* @param lId
* - ledger identifier
* @see BookKeeper#openLedger
@@ -249,12 +248,12 @@ public class BookKeeperAdmin {
public void asyncOpenLedgerNoRecovery(final long lId, final OpenCallback cb, final Object ctx) {
new LedgerOpenOp(bkc, lId, cb, ctx).initiateWithoutRecovery();
}
-
+
/**
* Open a ledger as an administrator without recovering the ledger. This
* means that no digest password checks are done. Otherwise, the call is
* identical to BookKeeper#openLedgerNoRecovery
- *
+ *
* @param lId
* ledger identifier
* @see BookKeeper#openLedgerNoRecovery
@@ -305,7 +304,7 @@ public class BookKeeperAdmin {
* Optional destination bookie that if passed, we will copy all
* of the ledger fragments from the source bookie over to it.
*/
- public void recoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest)
+ public void recoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest)
throws InterruptedException, BKException {
SyncObject sync = new SyncObject();
// Call the async method to recover bookie data.
@@ -356,7 +355,7 @@ public class BookKeeperAdmin {
* @param context
* Context for the RecoverCallback to call.
*/
- public void asyncRecoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
+ public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
final RecoverCallback cb, final Object context) {
// Sync ZK to make sure we're reading the latest bookie data.
zk.sync(bookiesPath, new AsyncCallback.VoidCallback() {
@@ -368,7 +367,9 @@ public class BookKeeperAdmin {
return;
}
getAvailableBookies(bookieSrc, bookieDest, cb, context);
- };
+ }
+
+ ;
}, null);
}
@@ -391,9 +392,9 @@ public class BookKeeperAdmin {
* @param context
* Context for the RecoverCallback to call.
*/
- private void getAvailableBookies(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
+ private void getAvailableBookies(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
final RecoverCallback cb, final Object context) {
- final List<InetSocketAddress> availableBookies = new LinkedList<InetSocketAddress>();
+ final List<BookieSocketAddress> availableBookies = new LinkedList<BookieSocketAddress>();
if (bookieDest != null) {
availableBookies.add(bookieDest);
// Now poll ZK to get the active ledgers
@@ -414,13 +415,15 @@ public class BookKeeperAdmin {
// exclude the readonly node from available bookies.
continue;
}
- String parts[] = bookieNode.split(BookKeeperConstants.COLON);
- if (parts.length < 2) {
+ BookieSocketAddress addr;
+ try {
+ addr = new BookieSocketAddress(bookieNode);
+ } catch (UnknownHostException nhe) {
LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode);
cb.recoverComplete(BKException.Code.ZKException, context);
return;
}
- availableBookies.add(new InetSocketAddress(parts[0], Integer.parseInt(parts[1])));
+ availableBookies.add(addr);
}
// Now poll ZK to get the active ledgers
getActiveLedgers(bookieSrc, null, cb, context, availableBookies);
@@ -452,8 +455,8 @@ public class BookKeeperAdmin {
* single bookie server if the user explicitly chose a bookie
* server to replicate data to.
*/
- private void getActiveLedgers(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
- final RecoverCallback cb, final Object context, final List<InetSocketAddress> availableBookies) {
+ private void getActiveLedgers(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
+ final RecoverCallback cb, final Object context, final List<BookieSocketAddress> availableBookies) {
// Wrapper class around the RecoverCallback so it can be used
// as the final VoidCallback to process ledgers
class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
@@ -476,18 +479,18 @@ public class BookKeeperAdmin {
}
};
bkc.getLedgerManager().asyncProcessLedgers(
- ledgerProcessor, new RecoverCallbackWrapper(cb),
- context, BKException.Code.OK, BKException.Code.LedgerRecoveryException);
+ ledgerProcessor, new RecoverCallbackWrapper(cb),
+ context, BKException.Code.OK, BKException.Code.LedgerRecoveryException);
}
/**
* Get a new random bookie, but ensure that it isn't one that is already
* in the ensemble for the ledger.
*/
- private InetSocketAddress getNewBookie(final List<InetSocketAddress> bookiesAlreadyInEnsemble,
- final List<InetSocketAddress> availableBookies)
+ private BookieSocketAddress getNewBookie(final List<BookieSocketAddress> bookiesAlreadyInEnsemble,
+ final List<BookieSocketAddress> availableBookies)
throws BKException.BKNotEnoughBookiesException {
- ArrayList<InetSocketAddress> candidates = new ArrayList<InetSocketAddress>();
+ ArrayList<BookieSocketAddress> candidates = new ArrayList<BookieSocketAddress>();
candidates.addAll(availableBookies);
candidates.removeAll(bookiesAlreadyInEnsemble);
if (candidates.size() == 0) {
@@ -514,8 +517,8 @@ public class BookKeeperAdmin {
* single bookie server if the user explicitly chose a bookie
* server to replicate data to.
*/
- private void recoverLedger(final InetSocketAddress bookieSrc, final long lId,
- final AsyncCallback.VoidCallback ledgerIterCb, final List<InetSocketAddress> availableBookies) {
+ private void recoverLedger(final BookieSocketAddress bookieSrc, final long lId,
+ final AsyncCallback.VoidCallback ledgerIterCb, final List<BookieSocketAddress> availableBookies) {
LOG.debug("Recovering ledger : {}", lId);
asyncOpenLedgerNoRecovery(lId, new OpenCallback() {
@@ -531,7 +534,7 @@ public class BookKeeperAdmin {
if (!lm.isClosed() &&
lm.getEnsembles().size() > 0) {
Long lastKey = lm.getEnsembles().lastKey();
- ArrayList<InetSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
+ ArrayList<BookieSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
// the original write has not removed faulty bookie from
// current ledger ensemble. to avoid data loss issue in
// the case of concurrent updates to the ensemble composition,
@@ -575,7 +578,7 @@ public class BookKeeperAdmin {
*/
Map<Long, Long> ledgerFragmentsRange = new HashMap<Long, Long>();
Long curEntryId = null;
- for (Map.Entry<Long, ArrayList<InetSocketAddress>> entry : lh.getLedgerMetadata().getEnsembles()
+ for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : lh.getLedgerMetadata().getEnsembles()
.entrySet()) {
if (curEntryId != null)
ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
@@ -617,25 +620,26 @@ public class BookKeeperAdmin {
*/
for (final Long startEntryId : ledgerFragmentsToRecover) {
Long endEntryId = ledgerFragmentsRange.get(startEntryId);
- InetSocketAddress newBookie = null;
+ BookieSocketAddress newBookie = null;
try {
newBookie = getNewBookie(lh.getLedgerMetadata().getEnsembles().get(startEntryId),
availableBookies);
} catch (BKException.BKNotEnoughBookiesException bke) {
- ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException,
+ ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException,
null, null);
continue;
}
-
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Replicating fragment from [" + startEntryId
+ LOG.debug("Replicating fragment from [" + startEntryId
+ "," + endEntryId + "] of ledger " + lh.getId()
+ " to " + newBookie);
}
try {
LedgerFragmentReplicator.SingleFragmentCallback cb = new LedgerFragmentReplicator.SingleFragmentCallback(
ledgerFragmentsMcb, lh, startEntryId, bookieSrc, newBookie);
- ArrayList<InetSocketAddress> currentEnsemble = lh.getLedgerMetadata().getEnsemble(startEntryId);
+ ArrayList<BookieSocketAddress> currentEnsemble = lh.getLedgerMetadata().getEnsemble(
+ startEntryId);
int bookieIndex = -1;
if (null != currentEnsemble) {
for (int i = 0; i < currentEnsemble.size(); i++) {
@@ -661,10 +665,10 @@ public class BookKeeperAdmin {
* This method asynchronously recovers a ledger fragment which is a
* contiguous portion of a ledger that was stored in an ensemble that
* included the failed bookie.
- *
+ *
* @param lh
* - LedgerHandle for the ledger
- * @param lf
+ * @param ledgerFragment
* - LedgerFragment to replicate
* @param ledgerFragmentMcb
* - MultiCallback to invoke once we've recovered the current
@@ -676,13 +680,14 @@ public class BookKeeperAdmin {
private void asyncRecoverLedgerFragment(final LedgerHandle lh,
final LedgerFragment ledgerFragment,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final InetSocketAddress newBookie) throws InterruptedException {
+ final BookieSocketAddress newBookie)
+ throws InterruptedException {
lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookie);
}
/**
* Replicate the Ledger fragment to target Bookie passed.
- *
+ *
* @param lh
* - ledgerHandle
* @param ledgerFragment
@@ -692,7 +697,7 @@ public class BookKeeperAdmin {
*/
public void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
- final InetSocketAddress targetBookieAddress)
+ final BookieSocketAddress targetBookieAddress)
throws InterruptedException, BKException {
SyncCounter syncCounter = new SyncCounter();
ResultCallBack resultCallBack = new ResultCallBack(syncCounter);
@@ -724,7 +729,7 @@ public class BookKeeperAdmin {
/**
* Format the BookKeeper metadata in zookeeper
- *
+ *
* @param isInteractive
* Whether format should ask prompt for confirmation if old data
* exists or not.
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Wed Feb 5 21:43:39 2014
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,35 +15,34 @@ package org.apache.bookkeeper.client;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.bookkeeper.client;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.SafeRunnable;
-import org.apache.bookkeeper.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class is responsible for maintaining a consistent view of what bookies
@@ -58,7 +55,7 @@ class BookieWatcher implements Watcher,
static final Logger logger = LoggerFactory.getLogger(BookieWatcher.class);
public static int ZK_CONNECT_BACKOFF_SEC = 1;
- private static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
+ private static final Set<BookieSocketAddress> EMPTY_SET = new HashSet<BookieSocketAddress>();
// Bookie registration path in ZK
private final String bookieRegistrationPath;
@@ -108,7 +105,7 @@ class BookieWatcher implements Watcher,
}
}
- public Collection<InetSocketAddress> getBookies() throws BKException {
+ public Collection<BookieSocketAddress> getBookies() throws BKException {
try {
List<String> children = bk.getZkHandle().getChildren(this.bookieRegistrationPath, false);
children.remove(BookKeeperConstants.READONLY);
@@ -123,8 +120,8 @@ class BookieWatcher implements Watcher,
}
}
- Collection<InetSocketAddress> getReadOnlyBookies() {
- return new HashSet<InetSocketAddress>(readOnlyBookieWatcher.getReadOnlyBookies());
+ Collection<BookieSocketAddress> getReadOnlyBookies() {
+ return new HashSet<BookieSocketAddress>(readOnlyBookieWatcher.getReadOnlyBookies());
}
public void readBookies() {
@@ -154,11 +151,11 @@ class BookieWatcher implements Watcher,
// available nodes list.
children.remove(BookKeeperConstants.READONLY);
- HashSet<InetSocketAddress> newBookieAddrs = convertToBookieAddresses(children);
+ HashSet<BookieSocketAddress> newBookieAddrs = convertToBookieAddresses(children);
- final Set<InetSocketAddress> deadBookies;
+ final Set<BookieSocketAddress> deadBookies;
synchronized (this) {
- Set<InetSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
+ Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
deadBookies = placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
}
@@ -167,13 +164,13 @@ class BookieWatcher implements Watcher,
}
}
- private static HashSet<InetSocketAddress> convertToBookieAddresses(List<String> children) {
+ private static HashSet<BookieSocketAddress> convertToBookieAddresses(List<String> children) {
// Read the bookie addresses into a set for efficient lookup
- HashSet<InetSocketAddress> newBookieAddrs = new HashSet<InetSocketAddress>();
+ HashSet<BookieSocketAddress> newBookieAddrs = new HashSet<BookieSocketAddress>();
for (String bookieAddrString : children) {
- InetSocketAddress bookieAddr;
+ BookieSocketAddress bookieAddr;
try {
- bookieAddr = StringUtils.parseAddr(bookieAddrString);
+ bookieAddr = new BookieSocketAddress(bookieAddrString);
} catch (IOException e) {
logger.error("Could not parse bookie address: " + bookieAddrString + ", ignoring this bookie");
continue;
@@ -212,7 +209,8 @@ class BookieWatcher implements Watcher,
}
/**
- * Wrapper over the {@link #getAdditionalBookies(Set, int)} method when there is no exclusion list (or exisiting bookies)
+ * Create an ensemble with given <i>ensembleSize</i> and <i>writeQuorumSize</i>.
+ *
* @param ensembleSize
* Ensemble Size
* @param writeQuorumSize
@@ -220,21 +218,24 @@ class BookieWatcher implements Watcher,
* @return list of bookies for new ensemble.
* @throws BKNotEnoughBookiesException
*/
- public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize)
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize)
throws BKNotEnoughBookiesException {
return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, EMPTY_SET);
}
/**
- * Wrapper over the {@link #getAdditionalBookies(Set, int)} method when you just need 1 extra bookie
+ * Choose a bookie to replace bookie <i>bookieIdx</i> in <i>existingBookies</i>.
* @param existingBookies
- * @return
+ * list of existing bookies.
+ * @param bookieIdx
+ * index of the bookie in the list to be replaced.
+ * @return the bookie to replace.
* @throws BKNotEnoughBookiesException
*/
- public InetSocketAddress replaceBookie(List<InetSocketAddress> existingBookies, int bookieIdx)
+ public BookieSocketAddress replaceBookie(List<BookieSocketAddress> existingBookies, int bookieIdx)
throws BKNotEnoughBookiesException {
- InetSocketAddress addr = existingBookies.get(bookieIdx);
- return placementPolicy.replaceBookie(addr, new HashSet<InetSocketAddress>(existingBookies));
+ BookieSocketAddress addr = existingBookies.get(bookieIdx);
+ return placementPolicy.replaceBookie(addr, new HashSet<BookieSocketAddress>(existingBookies));
}
/**
@@ -244,7 +245,7 @@ class BookieWatcher implements Watcher,
private static class ReadOnlyBookieWatcher implements Watcher, ChildrenCallback {
private final static Logger LOG = LoggerFactory.getLogger(ReadOnlyBookieWatcher.class);
- private HashSet<InetSocketAddress> readOnlyBookies = new HashSet<InetSocketAddress>();
+ private HashSet<BookieSocketAddress> readOnlyBookies = new HashSet<BookieSocketAddress>();
private BookKeeper bk;
private String readOnlyBookieRegPath;
@@ -307,12 +308,12 @@ class BookieWatcher implements Watcher,
return;
}
- HashSet<InetSocketAddress> newReadOnlyBookies = convertToBookieAddresses(children);
+ HashSet<BookieSocketAddress> newReadOnlyBookies = convertToBookieAddresses(children);
readOnlyBookies = newReadOnlyBookies;
}
// returns the readonly bookies
- public HashSet<InetSocketAddress> getReadOnlyBookies() {
+ public HashSet<BookieSocketAddress> getReadOnlyBookies() {
return readOnlyBookies;
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java Wed Feb 5 21:43:39 2014
@@ -17,15 +17,14 @@
*/
package org.apache.bookkeeper.client;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.commons.configuration.Configuration;
/**
@@ -33,23 +32,23 @@ import org.apache.commons.configuration.
*/
public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
- static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
+ static final Set<BookieSocketAddress> EMPTY_SET = new HashSet<BookieSocketAddress>();
- private Set<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
+ private Set<BookieSocketAddress> knownBookies = new HashSet<BookieSocketAddress>();
@Override
- public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int quorumSize,
- Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
- ArrayList<InetSocketAddress> newBookies = new ArrayList<InetSocketAddress>(ensembleSize);
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize,
+ Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ ArrayList<BookieSocketAddress> newBookies = new ArrayList<BookieSocketAddress>(ensembleSize);
if (ensembleSize <= 0) {
return newBookies;
}
- List<InetSocketAddress> allBookies;
+ List<BookieSocketAddress> allBookies;
synchronized (this) {
- allBookies = new ArrayList<InetSocketAddress>(knownBookies);
+ allBookies = new ArrayList<BookieSocketAddress>(knownBookies);
}
Collections.shuffle(allBookies);
- for (InetSocketAddress bookie : allBookies) {
+ for (BookieSocketAddress bookie : allBookies) {
if (excludeBookies.contains(bookie)) {
continue;
}
@@ -63,17 +62,17 @@ public class DefaultEnsemblePlacementPol
}
@Override
- public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace,
- Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
- ArrayList<InetSocketAddress> addresses = newEnsemble(1, 1, excludeBookies);
+ public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
+ Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, excludeBookies);
return addresses.get(0);
}
@Override
- public synchronized Set<InetSocketAddress> onClusterChanged(Set<InetSocketAddress> writableBookies,
- Set<InetSocketAddress> readOnlyBookies) {
- HashSet<InetSocketAddress> deadBookies;
- deadBookies = new HashSet<InetSocketAddress>(knownBookies);
+ public synchronized Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
+ Set<BookieSocketAddress> readOnlyBookies) {
+ HashSet<BookieSocketAddress> deadBookies;
+ deadBookies = new HashSet<BookieSocketAddress>(knownBookies);
deadBookies.removeAll(writableBookies);
// readonly bookies should not be treated as dead bookies
deadBookies.removeAll(readOnlyBookies);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java Wed Feb 5 21:43:39 2014
@@ -17,12 +17,11 @@
*/
package org.apache.bookkeeper.client;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Set;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.commons.configuration.Configuration;
/**
@@ -55,8 +54,8 @@ public interface EnsemblePlacementPolicy
* All the bookies in the cluster available for readonly.
* @return the dead bookies during this cluster change.
*/
- public Set<InetSocketAddress> onClusterChanged(Set<InetSocketAddress> writableBookies,
- Set<InetSocketAddress> readOnlyBookies);
+ public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
+ Set<BookieSocketAddress> readOnlyBookies);
/**
* Choose <i>numBookies</i> bookies for ensemble. If the count is more than the number of available
@@ -71,8 +70,8 @@ public interface EnsemblePlacementPolicy
* @return list of bookies chosen as targets.
* @throws BKNotEnoughBookiesException if not enough bookies available.
*/
- public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
- Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+ Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
/**
* Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie available in the cluster,
@@ -85,6 +84,6 @@ public interface EnsemblePlacementPolicy
* @return the bookie chosen as target.
* @throws BKNotEnoughBookiesException
*/
- public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace,
- Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+ public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
+ Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java Wed Feb 5 21:43:39 2014
@@ -21,22 +21,20 @@ package org.apache.bookkeeper.client;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Set;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
*Checks the complete ledger and finds the UnderReplicated fragments if any
*/
@@ -177,8 +175,8 @@ public class LedgerChecker {
final Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
Long curEntryId = null;
- ArrayList<InetSocketAddress> curEnsemble = null;
- for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : lh
+ ArrayList<BookieSocketAddress> curEnsemble = null;
+ for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : lh
.getLedgerMetadata().getEnsembles().entrySet()) {
if (curEntryId != null) {
for (int i = 0; i < curEnsemble.size(); i++) {
@@ -234,7 +232,7 @@ public class LedgerChecker {
});
for (int bi : lh.getDistributionSchedule().getWriteSet(entryToRead)) {
- InetSocketAddress addr = curEnsemble.get(bi);
+ BookieSocketAddress addr = curEnsemble.get(bi);
bookieClient.readEntry(addr, lh.getId(),
entryToRead, eecb, null);
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Wed Feb 5 21:43:39 2014
@@ -21,13 +21,13 @@
package org.apache.bookkeeper.client;
-import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
@@ -95,7 +95,7 @@ class LedgerCreateOp implements GenericC
* Adding bookies to ledger handle
*/
- ArrayList<InetSocketAddress> ensemble;
+ ArrayList<BookieSocketAddress> ensemble;
try {
ensemble = bk.bookieWatcher
.newEnsemble(metadata.getEnsembleSize(), metadata.getWriteQuorumSize());
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java Wed Feb 5 21:43:39 2014
@@ -19,11 +19,12 @@
*/
package org.apache.bookkeeper.client;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+
/**
* Represents the entries of a segment of a ledger which are stored on a single
* bookie in the segments bookie ensemble.
@@ -32,7 +33,7 @@ import java.util.SortedMap;
*/
public class LedgerFragment {
private final int bookieIndex;
- private final List<InetSocketAddress> ensemble;
+ private final List<BookieSocketAddress> ensemble;
private final long firstEntryId;
private final long lastKnownEntryId;
private final long ledgerId;
@@ -47,7 +48,7 @@ public class LedgerFragment {
this.bookieIndex = bookieIndex;
this.ensemble = lh.getLedgerMetadata().getEnsemble(firstEntryId);
this.schedule = lh.getDistributionSchedule();
- SortedMap<Long, ArrayList<InetSocketAddress>> ensembles = lh
+ SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = lh
.getLedgerMetadata().getEnsembles();
this.isLedgerClosed = lh.getLedgerMetadata().isClosed()
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
@@ -82,7 +83,7 @@ public class LedgerFragment {
/**
* Gets the failedBookie address
*/
- public InetSocketAddress getAddress() {
+ public BookieSocketAddress getAddress() {
return ensemble.get(bookieIndex);
}
@@ -133,7 +134,7 @@ public class LedgerFragment {
*
* @return the ensemble for the segment which this fragment is a part of
*/
- public List<InetSocketAddress> getEnsemble() {
+ public List<BookieSocketAddress> getEnsemble() {
return this.ensemble;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java Wed Feb 5 21:43:39 2014
@@ -19,7 +19,6 @@
*/
package org.apache.bookkeeper.client;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
@@ -29,12 +28,12 @@ import java.util.List;
import java.util.Set;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
-
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -60,7 +59,7 @@ public class LedgerFragmentReplicator {
private void replicateFragmentInternal(final LedgerHandle lh,
final LedgerFragment lf,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final InetSocketAddress newBookie) throws InterruptedException {
+ final BookieSocketAddress newBookie) throws InterruptedException {
if (!lf.isClosed()) {
LOG.error("Trying to replicate an unclosed fragment;"
+ " This is not safe {}", lf);
@@ -133,7 +132,7 @@ public class LedgerFragmentReplicator {
*/
void replicate(final LedgerHandle lh, final LedgerFragment lf,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final InetSocketAddress targetBookieAddress)
+ final BookieSocketAddress targetBookieAddress)
throws InterruptedException {
Set<LedgerFragment> partionedFragments = splitIntoSubFragments(lh, lf,
bkc.getConf().getRereplicationEntryBatchSize());
@@ -147,7 +146,7 @@ public class LedgerFragmentReplicator {
private void replicateNextBatch(final LedgerHandle lh,
final Iterator<LedgerFragment> fragments,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final InetSocketAddress targetBookieAddress) {
+ final BookieSocketAddress targetBookieAddress) {
if (fragments.hasNext()) {
try {
replicateFragmentInternal(lh, fragments.next(),
@@ -238,7 +237,7 @@ public class LedgerFragmentReplicator {
private void recoverLedgerFragmentEntry(final Long entryId,
final LedgerHandle lh,
final AsyncCallback.VoidCallback ledgerFragmentEntryMcb,
- final InetSocketAddress newBookie) throws InterruptedException {
+ final BookieSocketAddress newBookie) throws InterruptedException {
/*
* Read the ledger entry using the LedgerHandle. This will allow us to
* read the entry from one of the other replicated bookies other than
@@ -269,7 +268,7 @@ public class LedgerFragmentReplicator {
new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId,
- long entryId, InetSocketAddress addr,
+ long entryId, BookieSocketAddress addr,
Object ctx) {
if (rc != Code.OK.intValue()) {
LOG.error(
@@ -308,12 +307,12 @@ public class LedgerFragmentReplicator {
final AsyncCallback.VoidCallback ledgerFragmentsMcb;
final LedgerHandle lh;
final long fragmentStartId;
- final InetSocketAddress oldBookie;
- final InetSocketAddress newBookie;
+ final BookieSocketAddress oldBookie;
+ final BookieSocketAddress newBookie;
SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
LedgerHandle lh, long fragmentStartId,
- InetSocketAddress oldBookie, InetSocketAddress newBookie) {
+ BookieSocketAddress oldBookie, BookieSocketAddress newBookie) {
this.ledgerFragmentsMcb = ledgerFragmentsMcb;
this.lh = lh;
this.fragmentStartId = fragmentStartId;
@@ -337,13 +336,13 @@ public class LedgerFragmentReplicator {
/** Updates the ensemble with newBookie and notify the ensembleUpdatedCb */
private static void updateEnsembleInfo(
AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
- LedgerHandle lh, InetSocketAddress oldBookie,
- InetSocketAddress newBookie) {
+ LedgerHandle lh, BookieSocketAddress oldBookie,
+ BookieSocketAddress newBookie) {
/*
* Update the ledger metadata's ensemble info to point to the new
* bookie.
*/
- ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata()
+ ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
.getEnsembles().get(fragmentStartId);
int deadBookieIndex = ensemble.indexOf(oldBookie);
ensemble.remove(deadBookieIndex);
@@ -361,12 +360,12 @@ public class LedgerFragmentReplicator {
final AsyncCallback.VoidCallback ensembleUpdatedCb;
final LedgerHandle lh;
final long fragmentStartId;
- final InetSocketAddress oldBookie;
- final InetSocketAddress newBookie;
+ final BookieSocketAddress oldBookie;
+ final BookieSocketAddress newBookie;
public UpdateEnsembleCb(AsyncCallback.VoidCallback ledgerFragmentsMcb,
long fragmentStartId, LedgerHandle lh,
- InetSocketAddress oldBookie, InetSocketAddress newBookie) {
+ BookieSocketAddress oldBookie, BookieSocketAddress newBookie) {
this.ensembleUpdatedCb = ledgerFragmentsMcb;
this.lh = lh;
this.fragmentStartId = fragmentStartId;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Wed Feb 5 21:43:39 2014
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.client;
import static com.google.common.base.Charsets.UTF_8;
-import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,6 +35,7 @@ import org.apache.bookkeeper.client.Asyn
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
@@ -677,11 +677,11 @@ public class LedgerHandle {
}
- ArrayList<InetSocketAddress> replaceBookieInMetadata(final InetSocketAddress addr, final int bookieIndex)
+ ArrayList<BookieSocketAddress> replaceBookieInMetadata(final BookieSocketAddress addr, final int bookieIndex)
throws BKException.BKNotEnoughBookiesException {
- InetSocketAddress newBookie;
+ BookieSocketAddress newBookie;
LOG.info("Handling failure of bookie: {} index: {}", addr, bookieIndex);
- final ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>();
+ final ArrayList<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>();
final long newEnsembleStartEntry = lastAddConfirmed + 1;
// avoid parallel ensemble changes to same ensemble.
@@ -702,7 +702,7 @@ public class LedgerHandle {
return newEnsemble;
}
- void handleBookieFailure(final InetSocketAddress addr, final int bookieIndex) {
+ void handleBookieFailure(final BookieSocketAddress addr, final int bookieIndex) {
blockAddCompletions.incrementAndGet();
synchronized (metadata) {
@@ -715,7 +715,7 @@ public class LedgerHandle {
}
try {
- ArrayList<InetSocketAddress> newEnsemble = replaceBookieInMetadata(addr, bookieIndex);
+ ArrayList<BookieSocketAddress> newEnsemble = replaceBookieInMetadata(addr, bookieIndex);
EnsembleInfo ensembleInfo = new EnsembleInfo(newEnsemble, bookieIndex,
addr);
@@ -731,12 +731,12 @@ public class LedgerHandle {
// Contains newly reformed ensemble, bookieIndex, failedBookieAddress
private static final class EnsembleInfo {
- private final ArrayList<InetSocketAddress> newEnsemble;
+ private final ArrayList<BookieSocketAddress> newEnsemble;
private final int bookieIndex;
- private final InetSocketAddress addr;
+ private final BookieSocketAddress addr;
- public EnsembleInfo(ArrayList<InetSocketAddress> newEnsemble,
- int bookieIndex, InetSocketAddress addr) {
+ public EnsembleInfo(ArrayList<BookieSocketAddress> newEnsemble, int bookieIndex,
+ BookieSocketAddress addr) {
this.newEnsemble = newEnsemble;
this.bookieIndex = bookieIndex;
this.addr = addr;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Wed Feb 5 21:43:39 2014
@@ -17,12 +17,17 @@
*/
package org.apache.bookkeeper.client;
-import static com.google.common.base.Charsets.UTF_8;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.TextFormat;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
+import org.apache.bookkeeper.versioning.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -31,14 +36,7 @@ import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
-import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
-import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.versioning.Version;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.TextFormat;
+import static com.google.common.base.Charsets.UTF_8;
/**
* This class encapsulates all the ledger metadata that is persistently stored
@@ -70,8 +68,9 @@ public class LedgerMetadata {
private long lastEntryId;
private LedgerMetadataFormat.State state;
- private SortedMap<Long, ArrayList<InetSocketAddress>> ensembles = new TreeMap<Long, ArrayList<InetSocketAddress>>();
- ArrayList<InetSocketAddress> currentEnsemble;
+ private SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles =
+ new TreeMap<Long, ArrayList<BookieSocketAddress>>();
+ ArrayList<BookieSocketAddress> currentEnsemble;
volatile Version version = Version.NEW;
private boolean hasPassword = false;
@@ -116,9 +115,9 @@ public class LedgerMetadata {
this.password = new byte[other.password.length];
System.arraycopy(other.password, 0, this.password, 0, other.password.length);
// copy the ensembles
- for (Entry<Long, ArrayList<InetSocketAddress>> entry : other.ensembles.entrySet()) {
+ for (Entry<Long, ArrayList<BookieSocketAddress>> entry : other.ensembles.entrySet()) {
long startEntryId = entry.getKey();
- ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>(entry.getValue());
+ ArrayList<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>(entry.getValue());
this.addEnsemble(startEntryId, newEnsemble);
}
}
@@ -135,11 +134,11 @@ public class LedgerMetadata {
* @return SortedMap of Ledger Fragments and the corresponding
* bookie ensembles that store the entries.
*/
- public SortedMap<Long, ArrayList<InetSocketAddress>> getEnsembles() {
+ public SortedMap<Long, ArrayList<BookieSocketAddress>> getEnsembles() {
return ensembles;
}
- void setEnsembles(SortedMap<Long, ArrayList<InetSocketAddress>> ensembles) {
+ void setEnsembles(SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles) {
this.ensembles = ensembles;
}
@@ -214,14 +213,14 @@ public class LedgerMetadata {
state = LedgerMetadataFormat.State.CLOSED;
}
- void addEnsemble(long startEntryId, ArrayList<InetSocketAddress> ensemble) {
+ void addEnsemble(long startEntryId, ArrayList<BookieSocketAddress> ensemble) {
assert ensembles.isEmpty() || startEntryId >= ensembles.lastKey();
ensembles.put(startEntryId, ensemble);
currentEnsemble = ensemble;
}
- ArrayList<InetSocketAddress> getEnsemble(long entryId) {
+ ArrayList<BookieSocketAddress> getEnsemble(long entryId) {
// the head map cannot be empty, since we insert an ensemble for
// entry-id 0, right when we start
return ensembles.get(ensembles.headMap(entryId + 1).lastKey());
@@ -235,7 +234,7 @@ public class LedgerMetadata {
* @return
*/
long getNextEnsembleChange(long entryId) {
- SortedMap<Long, ArrayList<InetSocketAddress>> tailMap = ensembles.tailMap(entryId + 1);
+ SortedMap<Long, ArrayList<BookieSocketAddress>> tailMap = ensembles.tailMap(entryId + 1);
if (tailMap.isEmpty()) {
return -1;
@@ -262,11 +261,11 @@ public class LedgerMetadata {
builder.setDigestType(digestType).setPassword(ByteString.copyFrom(password));
}
- for (Map.Entry<Long, ArrayList<InetSocketAddress>> entry : ensembles.entrySet()) {
+ for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : ensembles.entrySet()) {
LedgerMetadataFormat.Segment.Builder segmentBuilder = LedgerMetadataFormat.Segment.newBuilder();
segmentBuilder.setFirstEntryId(entry.getKey());
- for (InetSocketAddress addr : entry.getValue()) {
- segmentBuilder.addEnsembleMember(StringUtils.addrToString(addr));
+ for (BookieSocketAddress addr : entry.getValue()) {
+ segmentBuilder.addEnsembleMember(addr.toString());
}
builder.addSegment(segmentBuilder.build());
}
@@ -283,11 +282,11 @@ public class LedgerMetadata {
s.append(VERSION_KEY).append(tSplitter).append(metadataFormatVersion).append(lSplitter);
s.append(writeQuorumSize).append(lSplitter).append(ensembleSize).append(lSplitter).append(length);
- for (Map.Entry<Long, ArrayList<InetSocketAddress>> entry : ensembles.entrySet()) {
+ for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : ensembles.entrySet()) {
s.append(lSplitter).append(entry.getKey());
- for (InetSocketAddress addr : entry.getValue()) {
+ for (BookieSocketAddress addr : entry.getValue()) {
s.append(tSplitter);
- s.append(StringUtils.addrToString(addr));
+ s.append(addr.toString());
}
}
@@ -325,7 +324,6 @@ public class LedgerMetadata {
if (versionLine == null) {
throw new IOException("Invalid metadata. Content missing");
}
- int i = 0;
if (versionLine.startsWith(VERSION_KEY)) {
String parts[] = versionLine.split(tSplitter);
lc.metadataFormatVersion = new Integer(parts[1]);
@@ -372,9 +370,9 @@ public class LedgerMetadata {
}
for (LedgerMetadataFormat.Segment s : data.getSegmentList()) {
- ArrayList<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>();
+ ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>();
for (String member : s.getEnsembleMemberList()) {
- addrs.add(StringUtils.parseAddr(member));
+ addrs.add(new BookieSocketAddress(member));
}
lc.addEnsemble(s.getFirstEntryId(), addrs);
}
@@ -405,9 +403,9 @@ public class LedgerMetadata {
lc.state = LedgerMetadataFormat.State.OPEN;
}
- ArrayList<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>();
+ ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>();
for (int j = 1; j < parts.length; j++) {
- addrs.add(StringUtils.parseAddr(parts[j]));
+ addrs.add(new BookieSocketAddress(parts[j]));
}
lc.addEnsemble(new Long(parts[0]), addrs);
line = reader.readLine();
@@ -420,7 +418,7 @@ public class LedgerMetadata {
/**
* Updates the version of this metadata.
- *
+ *
* @param v Version
*/
public void setVersion(Version v) {
@@ -429,7 +427,7 @@ public class LedgerMetadata {
/**
* Returns the last version.
- *
+ *
* @return version
*/
public Version getVersion() {
@@ -507,7 +505,7 @@ public class LedgerMetadata {
return sb.toString();
}
- void mergeEnsembles(SortedMap<Long, ArrayList<InetSocketAddress>> newEnsembles) {
+ void mergeEnsembles(SortedMap<Long, ArrayList<BookieSocketAddress>> newEnsembles) {
// allow new metadata to be one ensemble less than current metadata
// since ensemble change might kick in when recovery changed metadata
int diff = ensembles.size() - newEnsembles.size();
@@ -515,13 +513,13 @@ public class LedgerMetadata {
return;
}
int i = 0;
- for (Entry<Long, ArrayList<InetSocketAddress>> entry : newEnsembles.entrySet()) {
+ for (Entry<Long, ArrayList<BookieSocketAddress>> entry : newEnsembles.entrySet()) {
++i;
if (ensembles.size() != i) {
// we should use last ensemble from current metadata
// not the new metadata read from zookeeper
long key = entry.getKey();
- ArrayList<InetSocketAddress> ensemble = entry.getValue();
+ ArrayList<BookieSocketAddress> ensemble = entry.getValue();
ensembles.put(key, ensemble);
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=1564946&r1=1564945&r2=1564946&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java Wed Feb 5 21:43:39 2014
@@ -19,8 +19,9 @@ package org.apache.bookkeeper.client;
import java.util.HashSet;
import java.util.Set;
-import java.net.InetSocketAddress;
+
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -138,7 +139,7 @@ class PendingAddOp implements WriteCallb
}
@Override
- public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
+ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
int bookieIndex = (Integer) ctx;
if (completed) {
|