Author: todd
Date: Thu Dec 1 21:26:08 2011
New Revision: 1209249
URL: http://svn.apache.org/viewvc?rev=1209249&view=rev
Log:
HDFS-2612. Handle refreshNameNodes in federated HA clusters. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1209249&r1=1209248&r2=1209249&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
Thu Dec 1 21:26:08 2011
@@ -35,3 +35,5 @@ HDFS-1971. Send block report from datano
HDFS-2616. Change DatanodeProtocol#sendHeartbeat() to return HeartbeatResponse. (suresh)
HDFS-2622. Fix TestDFSUpgrade in HA branch. (todd)
+
+HDFS-2612. Handle refreshNameNodes in federated HA clusters (todd)
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1209249&r1=1209248&r2=1209249&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
Thu Dec 1 21:26:08 2011
@@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.da
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
@@ -42,6 +44,8 @@ import org.apache.hadoop.ipc.RPC;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* One instance per block-pool/namespace on the DN, which handles the
@@ -89,6 +93,21 @@ class BPOfferService {
this.bpServiceToActive = this.bpServices.get(0);
}
+ void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
+ Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
+ for (BPServiceActor actor : bpServices) {
+ oldAddrs.add(actor.getNNSocketAddress());
+ }
+ Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
+
+ if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) {
+ // Keep things simple for now -- we can implement this at a later date.
+ throw new IOException(
+ "HA does not currently support adding a new standby to a running DN. " +
+ "Please do a rolling restart of DNs to reconfigure the list of NNs.");
+ }
+ }
+
/**
* returns true if BP thread has completed initialization of storage
* and has registered with the corresponding namenode
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java?rev=1209249&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
(added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
Thu Dec 1 21:26:08 2011
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Manages the BPOfferService objects for the data node.
+ * Creation, removal, starting, stopping, shutdown on BPOfferService
+ * objects must be done via APIs in this class.
+ */
+@InterfaceAudience.Private
+class BlockPoolManager {
+ private static final Log LOG = DataNode.LOG;
+
+ private final Map<String, BPOfferService> bpByNameserviceId =
+ Maps.newHashMap();
+ private final Map<String, BPOfferService> bpByBlockPoolId =
+ Maps.newHashMap();
+ private final List<BPOfferService> offerServices =
+ Lists.newArrayList();
+
+ private final DataNode dn;
+
+ //This lock is used only to ensure exclusion of refreshNamenodes
+ private final Object refreshNamenodesLock = new Object();
+
+ BlockPoolManager(DataNode dn) {
+ this.dn = dn;
+ }
+
+ synchronized void addBlockPool(BPOfferService bpos) {
+ Preconditions.checkArgument(offerServices.contains(bpos),
+ "Unknown BPOS: %s", bpos);
+ if (bpos.getBlockPoolId() == null) {
+ throw new IllegalArgumentException("Null blockpool id");
+ }
+ bpByBlockPoolId.put(bpos.getBlockPoolId(), bpos);
+ }
+
+ /**
+ * Returns the array of BPOfferService objects.
+ * Caution: The BPOfferService returned could be shutdown any time.
+ */
+ synchronized BPOfferService[] getAllNamenodeThreads() {
+ BPOfferService[] bposArray = new BPOfferService[offerServices.size()];
+ return offerServices.toArray(bposArray);
+ }
+
+ synchronized BPOfferService get(String bpid) {
+ return bpByBlockPoolId.get(bpid);
+ }
+
+ // TODO(HA) would be good to kill this
+ synchronized BPOfferService get(InetSocketAddress addr) {
+ for (BPOfferService bpos : offerServices) {
+ if (bpos.containsNN(addr)) {
+ return bpos;
+ }
+ }
+ return null;
+ }
+
+ synchronized void remove(BPOfferService t) {
+ offerServices.remove(t);
+ bpByBlockPoolId.remove(t.getBlockPoolId());
+
+ boolean removed = false;
+ for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator();
+ it.hasNext() && !removed;) {
+ BPOfferService bpos = it.next();
+ if (bpos == t) {
+ it.remove();
+ LOG.info("Removed " + bpos);
+ removed = true;
+ }
+ }
+
+ if (!removed) {
+ LOG.warn("Couldn't remove BPOS " + t + " from bpByNameserviceId map");
+ }
+ }
+
+ void shutDownAll() throws InterruptedException {
+ BPOfferService[] bposArray = this.getAllNamenodeThreads();
+
+ for (BPOfferService bpos : bposArray) {
+ bpos.stop(); //interrupts the threads
+ }
+ //now join
+ for (BPOfferService bpos : bposArray) {
+ bpos.join();
+ }
+ }
+
+ synchronized void startAll() throws IOException {
+ try {
+ UserGroupInformation.getLoginUser().doAs(
+ new PrivilegedExceptionAction<Object>() {
+ public Object run() throws Exception {
+ for (BPOfferService bpos : offerServices) {
+ bpos.start();
+ }
+ return null;
+ }
+ });
+ } catch (InterruptedException ex) {
+ IOException ioe = new IOException();
+ ioe.initCause(ex.getCause());
+ throw ioe;
+ }
+ }
+
+ void joinAll() {
+ for (BPOfferService bpos: this.getAllNamenodeThreads()) {
+ bpos.join();
+ }
+ }
+
+ void refreshNamenodes(Configuration conf)
+ throws IOException {
+ LOG.info("Refresh request received for nameservices: "
+ + conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES));
+
+ Map<String, Map<String, InetSocketAddress>> newAddressMap =
+ DFSUtil.getNNServiceRpcAddresses(conf);
+
+ synchronized (refreshNamenodesLock) {
+ doRefreshNamenodes(newAddressMap);
+ }
+ }
+
+ private void doRefreshNamenodes(
+ Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException
{
+ assert Thread.holdsLock(refreshNamenodesLock);
+
+ Set<String> toRefresh = Sets.newHashSet();
+ Set<String> toAdd = Sets.newHashSet();
+ Set<String> toRemove;
+
+ synchronized (this) {
+ // Step 1. For each of the new nameservices, figure out whether
+ // it's an update of the set of NNs for an existing NS,
+ // or an entirely new nameservice.
+ for (String nameserviceId : addrMap.keySet()) {
+ if (bpByNameserviceId.containsKey(nameserviceId)) {
+ toRefresh.add(nameserviceId);
+ } else {
+ toAdd.add(nameserviceId);
+ }
+ }
+
+ // Step 2. Any nameservices we currently have but are no longer present
+ // need to be removed.
+ toRemove = Sets.newHashSet(Sets.difference(
+ bpByNameserviceId.keySet(), addrMap.keySet()));
+
+ assert toRefresh.size() + toAdd.size() ==
+ addrMap.size() :
+ "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
+ " toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
+ " toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
+
+
+ // Step 3. Start new nameservices
+ if (!toAdd.isEmpty()) {
+ LOG.info("Starting BPOfferServices for nameservices: " +
+ Joiner.on(",").useForNull("<default>").join(toAdd));
+
+ for (String nsToAdd : toAdd) {
+ ArrayList<InetSocketAddress> addrs =
+ Lists.newArrayList(addrMap.get(nsToAdd).values());
+ BPOfferService bpos = createBPOS(addrs);
+ bpByNameserviceId.put(nsToAdd, bpos);
+ offerServices.add(bpos);
+ }
+ }
+ startAll();
+ }
+
+ // Step 4. Shut down old nameservices. This happens outside
+ // of the synchronized(this) lock since they need to call
+ // back to .remove() from another thread
+ if (!toRemove.isEmpty()) {
+ LOG.info("Stopping BPOfferServices for nameservices: " +
+ Joiner.on(",").useForNull("<default>").join(toRemove));
+
+ for (String nsToRemove : toRemove) {
+ BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
+ bpos.stop();
+ bpos.join();
+ // they will call remove on their own
+ }
+ }
+
+ // Step 5. Update nameservices whose NN list has changed
+ if (!toRefresh.isEmpty()) {
+ LOG.info("Refreshing list of NNs for nameservices: " +
+ Joiner.on(",").useForNull("<default>").join(toRefresh));
+
+ for (String nsToRefresh : toRefresh) {
+ BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
+ ArrayList<InetSocketAddress> addrs =
+ Lists.newArrayList(addrMap.get(nsToRefresh).values());
+ bpos.refreshNNList(addrs);
+ }
+ }
+ }
+
+ /**
+ * Extracted out for test purposes.
+ */
+ protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
+ return new BPOfferService(nnAddrs, dn);
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1209249&r1=1209248&r2=1209249&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Thu Dec 1 21:26:08 2011
@@ -48,7 +48,6 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
import java.io.BufferedOutputStream;
@@ -71,12 +70,10 @@ import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -91,7 +88,6 @@ import org.apache.hadoop.fs.LocalFileSys
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -169,7 +165,6 @@ import org.apache.hadoop.util.VersionInf
import org.mortbay.util.ajax.JSON;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -236,163 +231,6 @@ public class DataNode extends Configured
return NetUtils.createSocketAddr(target);
}
- /**
- * Manages he BPOfferService objects for the data node.
- * Creation, removal, starting, stopping, shutdown on BPOfferService
- * objects must be done via APIs in this class.
- */
- @InterfaceAudience.Private
- class BlockPoolManager {
- private final Map<String, BPOfferService> bpMapping;
- private final List<BPOfferService> offerServices;
-
- //This lock is used only to ensure exclusion of refreshNamenodes
- private final Object refreshNamenodesLock = new Object();
-
- BlockPoolManager(Configuration conf)
- throws IOException {
- bpMapping = new HashMap<String, BPOfferService>();
- offerServices = new ArrayList<BPOfferService>();
-
- Map<String, Map<String, InetSocketAddress>> map =
- DFSUtil.getNNServiceRpcAddresses(conf);
- for (Entry<String, Map<String, InetSocketAddress>> entry :
- map.entrySet()) {
- List<InetSocketAddress> nnList = Lists.newArrayList(entry.getValue().values());
- BPOfferService bpos = new BPOfferService(nnList, DataNode.this);
- offerServices.add(bpos);
- }
- }
-
- synchronized void addBlockPool(BPOfferService bpos) {
- Preconditions.checkArgument(offerServices.contains(bpos),
- "Unknown BPOS: %s", bpos);
- if (bpos.getBlockPoolId() == null) {
- throw new IllegalArgumentException("Null blockpool id");
- }
- LOG.info("===> registering in bpmapping: " + bpos);
- bpMapping.put(bpos.getBlockPoolId(), bpos);
- }
-
- /**
- * Returns the array of BPOfferService objects.
- * Caution: The BPOfferService returned could be shutdown any time.
- */
- synchronized BPOfferService[] getAllNamenodeThreads() {
- BPOfferService[] bposArray = new BPOfferService[offerServices.size()];
- return offerServices.toArray(bposArray);
- }
-
- synchronized BPOfferService get(String bpid) {
- return bpMapping.get(bpid);
- }
-
- // TODO(HA) would be good to kill this
- synchronized BPOfferService get(InetSocketAddress addr) {
- for (BPOfferService bpos : offerServices) {
- if (bpos.containsNN(addr)) {
- return bpos;
- }
- }
- return null;
- }
-
- synchronized void remove(BPOfferService t) {
- offerServices.remove(t);
- bpMapping.remove(t.getBlockPoolId());
- }
-
- void shutDownAll() throws InterruptedException {
- BPOfferService[] bposArray = this.getAllNamenodeThreads();
-
- for (BPOfferService bpos : bposArray) {
- bpos.stop(); //interrupts the threads
- }
- //now join
- for (BPOfferService bpos : bposArray) {
- bpos.join();
- }
- }
-
- synchronized void startAll() throws IOException {
- try {
- UserGroupInformation.getLoginUser().doAs(
- new PrivilegedExceptionAction<Object>() {
- public Object run() throws Exception {
- for (BPOfferService bpos : offerServices) {
- bpos.start();
- }
- return null;
- }
- });
- } catch (InterruptedException ex) {
- IOException ioe = new IOException();
- ioe.initCause(ex.getCause());
- throw ioe;
- }
- }
-
- void joinAll() {
- for (BPOfferService bpos: this.getAllNamenodeThreads()) {
- bpos.join();
- }
- }
-
- void refreshNamenodes(Configuration conf)
- throws IOException {
- throw new UnsupportedOperationException("TODO(HA)");
-/*
- * TODO(HA)
-
- LOG.info("Refresh request received for nameservices: "
- + conf.get(DFS_FEDERATION_NAMESERVICES));
-
- // TODO(HA): need to update this for multiple NNs per nameservice
- // For now, just list all of the NNs into this set
- Map<String, Map<String, InetSocketAddress>> newAddressMap =
- DFSUtil.getNNServiceRpcAddresses(conf);
- Set<InetSocketAddress> newAddresses = Sets.newHashSet();
- for (ConfiguredNNAddress cnn : DFSUtil.flattenAddressMap(newAddressMap)) {
- newAddresses.add(cnn.getAddress());
- }
-
- List<BPOfferService> toShutdown = new ArrayList<BPOfferService>();
- List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>();
- synchronized (refreshNamenodesLock) {
- synchronized (this) {
- for (InetSocketAddress nnaddr : offerServices.keySet()) {
- if (!(newAddresses.contains(nnaddr))) {
- toShutdown.add(offerServices.get(nnaddr));
- }
- }
- for (InetSocketAddress nnaddr : newAddresses) {
- if (!(offerServices.containsKey(nnaddr))) {
- toStart.add(nnaddr);
- }
- }
-
- for (InetSocketAddress nnaddr : toStart) {
- BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this);
- offerServices.put(bpos.getNNSocketAddress(), bpos);
- }
- }
-
- for (BPOfferService bpos : toShutdown) {
- bpos.stop();
- bpos.join();
- }
-
- // stoping the BPOSes causes them to call remove() on their own when they
- // clean up.
-
- // Now start the threads that are not already running.
- startAll();
- }
- */
- }
-
- }
-
volatile boolean shouldRun = true;
private BlockPoolManager blockPoolManager;
public volatile FSDatasetInterface data = null;
@@ -779,7 +617,8 @@ public class DataNode extends Configured
metrics = DataNodeMetrics.create(conf, getMachineName());
- blockPoolManager = new BlockPoolManager(conf);
+ blockPoolManager = new BlockPoolManager(this);
+ blockPoolManager.refreshNamenodes(conf);
}
/**
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java?rev=1209249&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
(added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
Thu Dec 1 21:26:08 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestBlockPoolManager {
+ private Log LOG = LogFactory.getLog(TestBlockPoolManager.class);
+ private DataNode mockDN = Mockito.mock(DataNode.class);
+ private BlockPoolManager bpm;
+ private StringBuilder log = new StringBuilder();
+ private int mockIdx = 1;
+
+ @Before
+ public void setupBPM() {
+ bpm = new BlockPoolManager(mockDN){
+
+ @Override
+ protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
+ final int idx = mockIdx++;
+ doLog("create #" + idx);
+ final BPOfferService bpos = Mockito.mock(BPOfferService.class);
+ Mockito.doReturn("Mock BPOS #" + idx).when(bpos).toString();
+ // Log refreshes
+ try {
+ Mockito.doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ doLog("refresh #" + idx);
+ return null;
+ }
+ }).when(bpos).refreshNNList(
+ Mockito.<ArrayList<InetSocketAddress>>any());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ // Log stops
+ Mockito.doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ doLog("stop #" + idx);
+ bpm.remove(bpos);
+ return null;
+ }
+ }).when(bpos).stop();
+ return bpos;
+ }
+ };
+ }
+
+ private void doLog(String string) {
+ synchronized(log) {
+ LOG.info(string);
+ log.append(string).append("\n");
+ }
+ }
+
+ @Test
+ public void testSimpleSingleNS() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY,
+ "hdfs://mock1:8020");
+ bpm.refreshNamenodes(conf);
+ assertEquals("create #1\n", log.toString());
+ }
+
+ @Test
+ public void testFederationRefresh() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
+ "ns1,ns2");
+ addNN(conf, "ns1", "mock1:8020");
+ addNN(conf, "ns2", "mock1:8020");
+ bpm.refreshNamenodes(conf);
+ assertEquals(
+ "create #1\n" +
+ "create #2\n", log.toString());
+ log.setLength(0);
+
+ // Remove the first NS
+ conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
+ "ns1");
+ bpm.refreshNamenodes(conf);
+ assertEquals(
+ "stop #1\n" +
+ "refresh #2\n", log.toString());
+ log.setLength(0);
+
+ // Add back an NS -- this creates a new BPOS since the old
+ // one for ns2 should have been previously retired
+ conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
+ "ns1,ns2");
+ bpm.refreshNamenodes(conf);
+ assertEquals(
+ "create #3\n" +
+ "refresh #2\n", log.toString());
+ }
+
+ private static void addNN(Configuration conf, String ns, String addr) {
+ String key = DFSUtil.addKeySuffixes(
+ DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns);
+ conf.set(key, addr);
+ }
+}
|