Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 86C0A200CEB for ; Sat, 12 Aug 2017 18:38:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 855771650F7; Sat, 12 Aug 2017 16:38:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D900C1650F2 for ; Sat, 12 Aug 2017 18:38:05 +0200 (CEST) Received: (qmail 53413 invoked by uid 500); 12 Aug 2017 16:37:50 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 50320 invoked by uid 99); 12 Aug 2017 16:37:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Aug 2017 16:37:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BBCA2F5529; Sat, 12 Aug 2017 16:37:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Sat, 12 Aug 2017 16:38:31 -0000 Message-Id: <6ac6b3c9c36e41f5bc81bdf05edbbaf7@git.apache.org> In-Reply-To: <6fcd880663604d8d92d857336e69d06d@git.apache.org> References: <6fcd880663604d8d92d857336e69d06d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [46/50] [abbrv] hadoop git commit: HDFS-10687. Federation Membership State Store internal API. Contributed by Jason Kace and Inigo Goiri. archived-at: Sat, 12 Aug 2017 16:38:07 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5f31/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java new file mode 100644 index 0000000..2d74505 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the basic {@link ActiveNamenodeResolver} functionality. + */ +public class TestNamenodeResolver { + + private static StateStoreService stateStore; + private static ActiveNamenodeResolver namenodeResolver; + + @BeforeClass + public static void create() throws Exception { + + Configuration conf = getStateStoreConfiguration(); + + // Reduce expirations to 5 seconds + conf.setLong( + DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, + TimeUnit.SECONDS.toMillis(5)); + + stateStore = newStateStore(conf); + assertNotNull(stateStore); + + namenodeResolver = new MembershipNamenodeResolver(conf, stateStore); + namenodeResolver.setRouterId(ROUTERS[0]); + } + + @AfterClass + public static void destroy() throws Exception { + stateStore.stop(); + stateStore.close(); + } + + @Before + public void setup() throws IOException, InterruptedException { + // Wait for state store to connect + stateStore.loadDriver(); + waitStateStore(stateStore, 10000); + + // Clear NN registrations + boolean cleared = clearRecords(stateStore, MembershipState.class); + assertTrue(cleared); + } + + @Test + public void testStateStoreDisconnected() throws Exception { + + // Add an entry to the store + NamenodeStatusReport report = createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE); + assertTrue(namenodeResolver.registerNamenode(report)); + + // Close the data store driver + stateStore.closeDriver(); + assertFalse(stateStore.isDriverReady()); + + // Flush the caches + stateStore.refreshCaches(true); + + // Verify commands fail due to no cached data and no state store + // connectivity. + List nns = + namenodeResolver.getNamenodesForBlockPoolId(NAMESERVICES[0]); + assertNull(nns); + + verifyException(namenodeResolver, "registerNamenode", + StateStoreUnavailableException.class, + new Class[] {NamenodeStatusReport.class}, new Object[] {report}); + } + + /** + * Verify the first registration on the resolver. + * + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier within the nemeservice. + * @param resultsCount Number of results expected. + * @param state Expected state for the first one. + * @throws IOException If we cannot get the namenodes. + */ + private void verifyFirstRegistration(String nsId, String nnId, + int resultsCount, FederationNamenodeServiceState state) + throws IOException { + List namenodes = + namenodeResolver.getNamenodesForNameserviceId(nsId); + if (resultsCount == 0) { + assertNull(namenodes); + } else { + assertEquals(resultsCount, namenodes.size()); + if (namenodes.size() > 0) { + FederationNamenodeContext namenode = namenodes.get(0); + assertEquals(state, namenode.getState()); + assertEquals(nnId, namenode.getNamenodeId()); + } + } + } + + @Test + public void testRegistrationExpired() + throws InterruptedException, IOException { + + // Populate the state store with a single NN element + // 1) ns0:nn0 - Active + // Wait for the entry to expire without heartbeating + // Verify the NN entry is not accessible once expired. + NamenodeStatusReport report = createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE); + assertTrue(namenodeResolver.registerNamenode(report)); + + // Load cache + stateStore.refreshCaches(true); + + // Verify + verifyFirstRegistration( + NAMESERVICES[0], NAMENODES[0], 1, + FederationNamenodeServiceState.ACTIVE); + + // Wait past expiration (set in conf to 5 seconds) + Thread.sleep(6000); + // Reload cache + stateStore.refreshCaches(true); + + // Verify entry is now expired and is no longer in the cache + verifyFirstRegistration( + NAMESERVICES[0], NAMENODES[0], 0, + FederationNamenodeServiceState.ACTIVE); + + // Heartbeat again, updates dateModified + assertTrue(namenodeResolver.registerNamenode(report)); + // Reload cache + stateStore.refreshCaches(true); + + // Verify updated entry is marked active again and accessible to RPC server + verifyFirstRegistration( + NAMESERVICES[0], NAMENODES[0], 1, + FederationNamenodeServiceState.ACTIVE); + } + + @Test + public void testRegistrationNamenodeSelection() + throws InterruptedException, IOException { + + // 1) ns0:nn0 - Active + // 2) ns0:nn1 - Standby (newest) + // Verify the selected entry is the active entry + assertTrue(namenodeResolver.registerNamenode( + createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE))); + Thread.sleep(100); + assertTrue(namenodeResolver.registerNamenode( + createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + + stateStore.refreshCaches(true); + + verifyFirstRegistration( + NAMESERVICES[0], NAMENODES[0], 2, + FederationNamenodeServiceState.ACTIVE); + + // 1) ns0:nn0 - Expired (stale) + // 2) ns0:nn1 - Standby (newest) + // Verify the selected entry is the standby entry as the active entry is + // stale + assertTrue(namenodeResolver.registerNamenode( + createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE))); + + // Expire active registration + Thread.sleep(6000); + + // Refresh standby registration + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + + // Verify that standby is selected (active is now expired) + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 1, + FederationNamenodeServiceState.STANDBY); + + // 1) ns0:nn0 - Active + // 2) ns0:nn1 - Unavailable (newest) + // Verify the selected entry is the active entry + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE))); + Thread.sleep(100); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], null))); + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[0], 2, + FederationNamenodeServiceState.ACTIVE); + + // 1) ns0:nn0 - Unavailable (newest) + // 2) ns0:nn1 - Standby + // Verify the selected entry is the standby entry + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + Thread.sleep(1000); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], null))); + + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 2, + FederationNamenodeServiceState.STANDBY); + + // 1) ns0:nn0 - Active (oldest) + // 2) ns0:nn1 - Standby + // 3) ns0:nn2 - Active (newest) + // Verify the selected entry is the newest active entry + assertTrue(namenodeResolver.registerNamenode( + createNamenodeReport(NAMESERVICES[0], NAMENODES[0], null))); + Thread.sleep(100); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + Thread.sleep(100); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[2], HAServiceState.ACTIVE))); + + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[2], 3, + FederationNamenodeServiceState.ACTIVE); + + // 1) ns0:nn0 - Standby (oldest) + // 2) ns0:nn1 - Standby (newest) + // 3) ns0:nn2 - Standby + // Verify the selected entry is the newest standby entry + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.STANDBY))); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[2], HAServiceState.STANDBY))); + Thread.sleep(1500); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 3, + FederationNamenodeServiceState.STANDBY); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5f31/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java index fc5aebd..598b9cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java @@ -34,9 +34,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; import org.apache.hadoop.util.Time; /** @@ -96,7 +99,7 @@ public final class FederationStateStoreTestUtils { * @throws IOException If it cannot create the State Store. * @throws InterruptedException If we cannot wait for the store to start. */ - public static StateStoreService getStateStore( + public static StateStoreService newStateStore( Configuration configuration) throws IOException, InterruptedException { StateStoreService stateStore = new StateStoreService(); @@ -205,6 +208,7 @@ public final class FederationStateStoreTestUtils { if (!synchronizeRecords(store, emptyList, recordClass)) { return false; } + store.refreshCaches(true); return true; } @@ -229,4 +233,21 @@ public final class FederationStateStoreTestUtils { } return false; } + + public static MembershipState createMockRegistrationForNamenode( + String nameserviceId, String namenodeId, + FederationNamenodeServiceState state) throws IOException { + MembershipState entry = MembershipState.newInstance( + "routerId", nameserviceId, namenodeId, "clusterId", "test", + "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", state, false); + MembershipStats stats = MembershipStats.newInstance(); + stats.setNumOfActiveDatanodes(100); + stats.setNumOfDeadDatanodes(10); + stats.setNumOfDecommissioningDatanodes(20); + stats.setNumOfDecomActiveDatanodes(15); + stats.setNumOfDecomDeadDatanodes(5); + stats.setNumOfBlocks(10); + entry.setStats(stats); + return entry; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5f31/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java new file mode 100644 index 0000000..7f6704e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * Test the basic {@link StateStoreService} {@link MountTableStore} + * functionality. + */ +public class TestStateStoreBase { + + private static StateStoreService stateStore; + private static Configuration conf; + + protected static StateStoreService getStateStore() { + return stateStore; + } + + protected static Configuration getConf() { + return conf; + } + + @BeforeClass + public static void createBase() throws IOException, InterruptedException { + + conf = getStateStoreConfiguration(); + + // Disable auto-reconnect to data store + conf.setLong(DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, + TimeUnit.HOURS.toMillis(1)); + } + + @AfterClass + public static void destroyBase() throws Exception { + if (stateStore != null) { + stateStore.stop(); + stateStore.close(); + stateStore = null; + } + } + + @Before + public void setupBase() throws IOException, InterruptedException, + InstantiationException, IllegalAccessException { + if (stateStore == null) { + stateStore = newStateStore(conf); + assertNotNull(stateStore); + } + // Wait for state store to connect + stateStore.loadDriver(); + waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5f31/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java new file mode 100644 index 0000000..26f081b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.util.Time; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the basic {@link MembershipStore} membership functionality. + */ +public class TestStateStoreMembershipState extends TestStateStoreBase { + + private static MembershipStore membershipStore; + + @BeforeClass + public static void create() { + // Reduce expirations to 5 seconds + getConf().setLong( + DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, + TimeUnit.SECONDS.toMillis(5)); + } + + @Before + public void setup() throws IOException, InterruptedException { + + membershipStore = + getStateStore().getRegisteredRecordStore(MembershipStore.class); + + // Clear NN registrations + assertTrue(clearRecords(getStateStore(), MembershipState.class)); + } + + @Test + public void testNamenodeStateOverride() throws Exception { + // Populate the state store + // 1) ns0:nn0 - Standby + String ns = "ns0"; + String nn = "nn0"; + MembershipState report = createRegistration( + ns, nn, ROUTERS[1], FederationNamenodeServiceState.STANDBY); + assertTrue(namenodeHeartbeat(report)); + + // Load data into cache and calculate quorum + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + MembershipState existingState = getNamenodeRegistration(ns, nn); + assertEquals( + FederationNamenodeServiceState.STANDBY, existingState.getState()); + + // Override cache + UpdateNamenodeRegistrationRequest request = + UpdateNamenodeRegistrationRequest.newInstance( + ns, nn, FederationNamenodeServiceState.ACTIVE); + assertTrue(membershipStore.updateNamenodeRegistration(request).getResult()); + + MembershipState newState = getNamenodeRegistration(ns, nn); + assertEquals(FederationNamenodeServiceState.ACTIVE, newState.getState()); + } + + @Test + public void testStateStoreDisconnected() throws Exception { + + // Close the data store driver + getStateStore().closeDriver(); + assertFalse(getStateStore().isDriverReady()); + + NamenodeHeartbeatRequest hbRequest = NamenodeHeartbeatRequest.newInstance(); + hbRequest.setNamenodeMembership( + createMockRegistrationForNamenode( + "test", "test", FederationNamenodeServiceState.UNAVAILABLE)); + verifyException(membershipStore, "namenodeHeartbeat", + StateStoreUnavailableException.class, + new Class[] {NamenodeHeartbeatRequest.class}, + new Object[] {hbRequest }); + + // Information from cache, no exception should be triggered for these + // TODO - should cached info expire at some point? + GetNamenodeRegistrationsRequest getRequest = + GetNamenodeRegistrationsRequest.newInstance(); + verifyException(membershipStore, + "getNamenodeRegistrations", null, + new Class[] {GetNamenodeRegistrationsRequest.class}, + new Object[] {getRequest}); + + verifyException(membershipStore, + "getExpiredNamenodeRegistrations", null, + new Class[] {GetNamenodeRegistrationsRequest.class}, + new Object[] {getRequest}); + + UpdateNamenodeRegistrationRequest overrideRequest = + UpdateNamenodeRegistrationRequest.newInstance(); + verifyException(membershipStore, + "updateNamenodeRegistration", null, + new Class[] {UpdateNamenodeRegistrationRequest.class}, + new Object[] {overrideRequest}); + } + + private void registerAndLoadRegistrations( + List registrationList) throws IOException { + // Populate + assertTrue(synchronizeRecords( + getStateStore(), registrationList, MembershipState.class)); + + // Load into cache + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + } + + private MembershipState createRegistration(String ns, String nn, + String router, FederationNamenodeServiceState state) throws IOException { + MembershipState record = MembershipState.newInstance( + router, ns, + nn, "testcluster", "testblock-" + ns, "testrpc-"+ ns + nn, + "testservice-"+ ns + nn, "testlifeline-"+ ns + nn, + "testweb-" + ns + nn, state, false); + return record; + } + + @Test + public void testRegistrationMajorityQuorum() + throws InterruptedException, IOException { + + // Populate the state store with a set of non-matching elements + // 1) ns0:nn0 - Standby (newest) + // 2) ns0:nn0 - Active (oldest) + // 3) ns0:nn0 - Active (2nd oldest) + // 4) ns0:nn0 - Active (3nd oldest element, newest active element) + // Verify the selected entry is the newest majority opinion (4) + String ns = "ns0"; + String nn = "nn0"; + + // Active - oldest + MembershipState report = createRegistration( + ns, nn, ROUTERS[1], FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report)); + Thread.sleep(1000); + + // Active - 2nd oldest + report = createRegistration( + ns, nn, ROUTERS[2], FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report)); + Thread.sleep(1000); + + // Active - 3rd oldest, newest active element + report = createRegistration( + ns, nn, ROUTERS[3], FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report)); + + // standby - newest overall + report = createRegistration( + ns, nn, ROUTERS[0], FederationNamenodeServiceState.STANDBY); + assertTrue(namenodeHeartbeat(report)); + + // Load and calculate quorum + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify quorum entry + MembershipState quorumEntry = getNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(quorumEntry.getRouterId(), ROUTERS[3]); + } + + @Test + public void testRegistrationQuorumExcludesExpired() + throws InterruptedException, IOException { + + // Populate the state store with some expired entries and verify the expired + // entries are ignored. + // 1) ns0:nn0 - Active + // 2) ns0:nn0 - Expired + // 3) ns0:nn0 - Expired + // 4) ns0:nn0 - Expired + // Verify the selected entry is the active entry + List registrationList = new ArrayList<>(); + String ns = "ns0"; + String nn = "nn0"; + String rpcAddress = "testrpcaddress"; + String serviceAddress = "testserviceaddress"; + String lifelineAddress = "testlifelineaddress"; + String blockPoolId = "testblockpool"; + String clusterId = "testcluster"; + String webAddress = "testwebaddress"; + boolean safemode = false; + + // Active + MembershipState record = MembershipState.newInstance( + ROUTERS[0], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.ACTIVE, safemode); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[1], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[2], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[3], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + registrationList.add(record); + registerAndLoadRegistrations(registrationList); + + // Verify quorum entry chooses active membership + MembershipState quorumEntry = getNamenodeRegistration( + record.getNameserviceId(), record.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(ROUTERS[0], quorumEntry.getRouterId()); + } + + @Test + public void testRegistrationQuorumAllExpired() throws IOException { + + // 1) ns0:nn0 - Expired (oldest) + // 2) ns0:nn0 - Expired + // 3) ns0:nn0 - Expired + // 4) ns0:nn0 - Expired + // Verify no entry is either selected or cached + List registrationList = new ArrayList<>(); + String ns = NAMESERVICES[0]; + String nn = NAMENODES[0]; + String rpcAddress = "testrpcaddress"; + String serviceAddress = "testserviceaddress"; + String lifelineAddress = "testlifelineaddress"; + String blockPoolId = "testblockpool"; + String clusterId = "testcluster"; + String webAddress = "testwebaddress"; + boolean safemode = false; + long startingTime = Time.now(); + + // Expired + MembershipState record = MembershipState.newInstance( + ROUTERS[0], ns, nn, clusterId, blockPoolId, + rpcAddress, webAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + record.setDateModified(startingTime - 10000); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[1], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + record.setDateModified(startingTime); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[2], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + record.setDateModified(startingTime); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[3], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + record.setDateModified(startingTime); + registrationList.add(record); + + registerAndLoadRegistrations(registrationList); + + // Verify no entry is found for this nameservice + assertNull(getNamenodeRegistration( + record.getNameserviceId(), record.getNamenodeId())); + } + + @Test + public void testRegistrationNoQuorum() + throws InterruptedException, IOException { + + // Populate the state store with a set of non-matching elements + // 1) ns0:nn0 - Standby (newest) + // 2) ns0:nn0 - Standby (oldest) + // 3) ns0:nn0 - Active (2nd oldest) + // 4) ns0:nn0 - Active (3nd oldest element, newest active element) + // Verify the selected entry is the newest entry (1) + MembershipState report1 = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[1], + FederationNamenodeServiceState.STANDBY); + assertTrue(namenodeHeartbeat(report1)); + Thread.sleep(100); + MembershipState report2 = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[2], + FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report2)); + Thread.sleep(100); + MembershipState report3 = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[3], + FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report3)); + Thread.sleep(100); + MembershipState report4 = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[0], + FederationNamenodeServiceState.STANDBY); + assertTrue(namenodeHeartbeat(report4)); + + // Load and calculate quorum + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify quorum entry uses the newest data, even though it is standby + MembershipState quorumEntry = getNamenodeRegistration( + report1.getNameserviceId(), report1.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(ROUTERS[0], quorumEntry.getRouterId()); + assertEquals( + FederationNamenodeServiceState.STANDBY, quorumEntry.getState()); + } + + @Test + public void testRegistrationExpired() + throws InterruptedException, IOException { + + // Populate the state store with a single NN element + // 1) ns0:nn0 - Active + // Wait for the entry to expire without heartbeating + // Verify the NN entry is populated as EXPIRED internally in the state store + + MembershipState report = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[0], + FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report)); + + // Load cache + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify quorum and entry + MembershipState quorumEntry = getNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(ROUTERS[0], quorumEntry.getRouterId()); + assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState()); + + // Wait past expiration (set in conf to 5 seconds) + Thread.sleep(6000); + // Reload cache + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify entry is now expired and is no longer in the cache + quorumEntry = getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]); + assertNull(quorumEntry); + + // Verify entry is now expired and can't be used by RPC service + quorumEntry = getNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNull(quorumEntry); + + // Heartbeat again, updates dateModified + assertTrue(namenodeHeartbeat(report)); + // Reload cache + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify updated entry marked as active and is accessible to RPC server + quorumEntry = getNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(ROUTERS[0], quorumEntry.getRouterId()); + assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState()); + } + + /** + * Get a single namenode membership record from the store. + * + * @param nsId The HDFS nameservice ID to search for + * @param nnId The HDFS namenode ID to search for + * @return The single NamenodeMembershipRecord that matches the query or null + * if not found. + * @throws IOException if the query could not be executed. + */ + private MembershipState getNamenodeRegistration( + final String nsId, final String nnId) throws IOException { + + MembershipState partial = MembershipState.newInstance(); + partial.setNameserviceId(nsId); + partial.setNamenodeId(nnId); + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(partial); + GetNamenodeRegistrationsResponse response = + membershipStore.getNamenodeRegistrations(request); + + List results = response.getNamenodeMemberships(); + if (results != null && results.size() == 1) { + MembershipState record = results.get(0); + return record; + } + return null; + } + + /** + * Register a namenode heartbeat with the state store. + * + * @param store FederationMembershipStateStore instance to retrieve the + * membership data records. + * @param namenode A fully populated namenode membership record to be + * committed to the data store. + * @return True if successful, false otherwise. + * @throws IOException if the state store query could not be performed. + */ + private boolean namenodeHeartbeat(MembershipState namenode) + throws IOException { + + NamenodeHeartbeatRequest request = + NamenodeHeartbeatRequest.newInstance(namenode); + NamenodeHeartbeatResponse response = + membershipStore.namenodeHeartbeat(request); + return response.getResult(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5f31/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 7f0b36a..dc51ee9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -31,11 +31,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; import org.junit.AfterClass; @@ -54,6 +57,8 @@ public class TestStateStoreDriverBase { private static StateStoreService stateStore; private static Configuration conf; + private static final Random RANDOM = new Random(); + /** * Get the State Store driver. @@ -78,29 +83,47 @@ public class TestStateStoreDriverBase { */ public static void getStateStore(Configuration config) throws Exception { conf = config; - stateStore = FederationStateStoreTestUtils.getStateStore(conf); + stateStore = FederationStateStoreTestUtils.newStateStore(conf); + } + + private String generateRandomString() { + String randomString = "/randomString-" + RANDOM.nextInt(); + return randomString; + } + + @SuppressWarnings("rawtypes") + private T generateRandomEnum(Class enumClass) { + int x = RANDOM.nextInt(enumClass.getEnumConstants().length); + T data = enumClass.getEnumConstants()[x]; + return data; } + @SuppressWarnings("unchecked") private T generateFakeRecord(Class recordClass) throws IllegalArgumentException, IllegalAccessException, IOException { - // TODO add record + if (recordClass == MembershipState.class) { + return (T) MembershipState.newInstance(generateRandomString(), + generateRandomString(), generateRandomString(), + generateRandomString(), generateRandomString(), + generateRandomString(), generateRandomString(), + generateRandomString(), generateRandomString(), + generateRandomEnum(FederationNamenodeServiceState.class), false); + } + return null; } /** * Validate if a record is the same. * - * @param original - * @param committed + * @param original Original record. + * @param committed Committed record. * @param assertEquals Assert if the records are equal or just return. - * @return - * @throws IllegalArgumentException - * @throws IllegalAccessException + * @return If the record is successfully validated. */ private boolean validateRecord( - BaseRecord original, BaseRecord committed, boolean assertEquals) - throws IllegalArgumentException, IllegalAccessException { + BaseRecord original, BaseRecord committed, boolean assertEquals) { boolean ret = true; @@ -131,7 +154,7 @@ public class TestStateStoreDriverBase { } public static void removeAll(StateStoreDriver driver) throws IOException { - // TODO add records to remove + driver.removeAll(MembershipState.class); } public void testInsert( @@ -139,17 +162,20 @@ public class TestStateStoreDriverBase { throws IllegalArgumentException, IllegalAccessException, IOException { assertTrue(driver.removeAll(recordClass)); - QueryResult records = driver.get(recordClass); - assertTrue(records.getRecords().isEmpty()); + QueryResult queryResult0 = driver.get(recordClass); + List records0 = queryResult0.getRecords(); + assertTrue(records0.isEmpty()); // Insert single BaseRecord record = generateFakeRecord(recordClass); driver.put(record, true, false); // Verify - records = driver.get(recordClass); - assertEquals(1, records.getRecords().size()); - validateRecord(record, records.getRecords().get(0), true); + QueryResult queryResult1 = driver.get(recordClass); + List records1 = queryResult1.getRecords(); + assertEquals(1, records1.size()); + T record0 = records1.get(0); + validateRecord(record, record0, true); // Insert multiple List insertList = new ArrayList<>(); @@ -160,8 +186,9 @@ public class TestStateStoreDriverBase { driver.putAll(insertList, true, false); // Verify - records = driver.get(recordClass); - assertEquals(11, records.getRecords().size()); + QueryResult queryResult2 = driver.get(recordClass); + List records2 = queryResult2.getRecords(); + assertEquals(11, records2.size()); } public void testFetchErrors(StateStoreDriver driver, @@ -319,23 +346,23 @@ public class TestStateStoreDriverBase { public void testInsert(StateStoreDriver driver) throws IllegalArgumentException, IllegalAccessException, IOException { - // TODO add records + testInsert(driver, MembershipState.class); } public void testPut(StateStoreDriver driver) throws IllegalArgumentException, ReflectiveOperationException, IOException, SecurityException { - // TODO add records + testPut(driver, MembershipState.class); } public void testRemove(StateStoreDriver driver) throws IllegalArgumentException, IllegalAccessException, IOException { - // TODO add records + testRemove(driver, MembershipState.class); } public void testFetchErrors(StateStoreDriver driver) throws IllegalArgumentException, IllegalAccessException, IOException { - // TODO add records + testFetchErrors(driver, MembershipState.class); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85b5f31/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java new file mode 100644 index 0000000..d922414 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.junit.Test; + +/** + * Test the Membership State records. + */ +public class TestMembershipState { + + private static final String ROUTER = "router"; + private static final String NAMESERVICE = "nameservice"; + private static final String NAMENODE = "namenode"; + private static final String CLUSTER_ID = "cluster"; + private static final String BLOCKPOOL_ID = "blockpool"; + private static final String RPC_ADDRESS = "rpcaddress"; + private static final String SERVICE_ADDRESS = "serviceaddress"; + private static final String LIFELINE_ADDRESS = "lifelineaddress"; + private static final String WEB_ADDRESS = "webaddress"; + private static final boolean SAFE_MODE = false; + + private static final long DATE_CREATED = 100; + private static final long DATE_MODIFIED = 200; + + private static final long NUM_BLOCKS = 300; + private static final long NUM_FILES = 400; + private static final int NUM_DEAD = 500; + private static final int NUM_ACTIVE = 600; + private static final int NUM_DECOM = 700; + private static final int NUM_DECOM_ACTIVE = 800; + private static final int NUM_DECOM_DEAD = 900; + private static final long NUM_BLOCK_MISSING = 1000; + + private static final long TOTAL_SPACE = 1100; + private static final long AVAILABLE_SPACE = 1200; + + private static final FederationNamenodeServiceState STATE = + FederationNamenodeServiceState.ACTIVE; + + private MembershipState createRecord() throws IOException { + + MembershipState record = MembershipState.newInstance( + ROUTER, NAMESERVICE, NAMENODE, CLUSTER_ID, + BLOCKPOOL_ID, RPC_ADDRESS, SERVICE_ADDRESS, LIFELINE_ADDRESS, + WEB_ADDRESS, STATE, SAFE_MODE); + record.setDateCreated(DATE_CREATED); + record.setDateModified(DATE_MODIFIED); + + MembershipStats stats = MembershipStats.newInstance(); + stats.setNumOfBlocks(NUM_BLOCKS); + stats.setNumOfFiles(NUM_FILES); + stats.setNumOfActiveDatanodes(NUM_ACTIVE); + stats.setNumOfDeadDatanodes(NUM_DEAD); + stats.setNumOfDecommissioningDatanodes(NUM_DECOM); + stats.setNumOfDecomActiveDatanodes(NUM_DECOM_ACTIVE); + stats.setNumOfDecomDeadDatanodes(NUM_DECOM_DEAD); + stats.setNumOfBlocksMissing(NUM_BLOCK_MISSING); + stats.setTotalSpace(TOTAL_SPACE); + stats.setAvailableSpace(AVAILABLE_SPACE); + record.setStats(stats); + return record; + } + + private void validateRecord(MembershipState record) throws IOException { + + assertEquals(ROUTER, record.getRouterId()); + assertEquals(NAMESERVICE, record.getNameserviceId()); + assertEquals(CLUSTER_ID, record.getClusterId()); + assertEquals(BLOCKPOOL_ID, record.getBlockPoolId()); + assertEquals(RPC_ADDRESS, record.getRpcAddress()); + assertEquals(WEB_ADDRESS, record.getWebAddress()); + assertEquals(STATE, record.getState()); + assertEquals(SAFE_MODE, record.getIsSafeMode()); + assertEquals(DATE_CREATED, record.getDateCreated()); + assertEquals(DATE_MODIFIED, record.getDateModified()); + + MembershipStats stats = record.getStats(); + assertEquals(NUM_BLOCKS, stats.getNumOfBlocks()); + assertEquals(NUM_FILES, stats.getNumOfFiles()); + assertEquals(NUM_ACTIVE, stats.getNumOfActiveDatanodes()); + assertEquals(NUM_DEAD, stats.getNumOfDeadDatanodes()); + assertEquals(NUM_DECOM, stats.getNumOfDecommissioningDatanodes()); + assertEquals(NUM_DECOM_ACTIVE, stats.getNumOfDecomActiveDatanodes()); + assertEquals(NUM_DECOM_DEAD, stats.getNumOfDecomDeadDatanodes()); + assertEquals(TOTAL_SPACE, stats.getTotalSpace()); + assertEquals(AVAILABLE_SPACE, stats.getAvailableSpace()); + } + + @Test + public void testGetterSetter() throws IOException { + MembershipState record = createRecord(); + validateRecord(record); + } + + @Test + public void testSerialization() throws IOException { + + MembershipState record = createRecord(); + + StateStoreSerializer serializer = StateStoreSerializer.getSerializer(); + String serializedString = serializer.serializeString(record); + MembershipState newRecord = + serializer.deserialize(serializedString, MembershipState.class); + + validateRecord(newRecord); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org