Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 292C0200CC8 for ; Fri, 14 Jul 2017 21:03:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 276FD16E3F0; Fri, 14 Jul 2017 19:03:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D36AC16E3E0 for ; Fri, 14 Jul 2017 21:03:17 +0200 (CEST) Received: (qmail 81761 invoked by uid 500); 14 Jul 2017 19:03:09 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 76229 invoked by uid 99); 14 Jul 2017 19:03:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Jul 2017 19:03:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20E37F554C; Fri, 14 Jul 2017 19:03:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: subru@apache.org To: common-commits@hadoop.apache.org Date: Fri, 14 Jul 2017 19:03:30 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [28/51] [abbrv] hadoop git commit: YARN-3663. Federation State and Policy Store (DBMS implementation). (Giovanni Matteo Fumarola via curino). archived-at: Fri, 14 Jul 2017 19:03:20 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/2180e46f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 80b00ef..db04592 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -19,13 +19,14 @@ package org.apache.hadoop.yarn.server.federation.store.impl; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Calendar; import java.util.List; +import java.util.TimeZone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; -import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; @@ -87,13 +88,26 @@ public abstract class FederationStateStoreBaseTest { @Test public void testRegisterSubCluster() throws Exception { SubClusterId subClusterId = SubClusterId.newInstance("SC"); + SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); + long previousTimeStamp = + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); + SubClusterRegisterResponse result = stateStore.registerSubCluster( SubClusterRegisterRequest.newInstance(subClusterInfo)); + long currentTimeStamp = + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); + Assert.assertNotNull(result); Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId)); + + // The saved heartbeat is between the old one and the current timestamp + Assert.assertTrue(querySubClusterInfo(subClusterId) + .getLastHeartBeat() <= currentTimeStamp); + Assert.assertTrue(querySubClusterInfo(subClusterId) + .getLastHeartBeat() >= previousTimeStamp); } @Test @@ -120,9 +134,7 @@ public abstract class FederationStateStoreBaseTest { stateStore.deregisterSubCluster(deregisterRequest); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL, - e.getCode()); + Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found")); } } @@ -149,9 +161,8 @@ public abstract class FederationStateStoreBaseTest { stateStore.getSubCluster(request).getSubClusterInfo(); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, - e.getCode()); + Assert.assertTrue( + e.getMessage().startsWith("SubCluster SC does not exist")); } } @@ -200,13 +211,24 @@ public abstract class FederationStateStoreBaseTest { SubClusterId subClusterId = SubClusterId.newInstance("SC"); registerSubCluster(createSubClusterInfo(subClusterId)); + long previousHeartBeat = + querySubClusterInfo(subClusterId).getLastHeartBeat(); + SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest .newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability"); stateStore.subClusterHeartbeat(heartbeatRequest); + long currentTimeStamp = + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); + Assert.assertEquals(SubClusterState.SC_RUNNING, querySubClusterInfo(subClusterId).getState()); - Assert.assertNotNull(querySubClusterInfo(subClusterId).getLastHeartBeat()); + + // The saved heartbeat is between the old one and the current timestamp + Assert.assertTrue(querySubClusterInfo(subClusterId) + .getLastHeartBeat() <= currentTimeStamp); + Assert.assertTrue(querySubClusterInfo(subClusterId) + .getLastHeartBeat() >= previousHeartBeat); } @Test @@ -219,9 +241,8 @@ public abstract class FederationStateStoreBaseTest { stateStore.subClusterHeartbeat(heartbeatRequest); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL, - e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("SubCluster SC does not exist; cannot heartbeat")); } } @@ -281,9 +302,8 @@ public abstract class FederationStateStoreBaseTest { queryApplicationHomeSC(appId); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL, - e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId + " does not exist")); } } @@ -298,8 +318,8 @@ public abstract class FederationStateStoreBaseTest { stateStore.deleteApplicationHomeSubCluster(delRequest); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " does not exist")); } } @@ -331,9 +351,8 @@ public abstract class FederationStateStoreBaseTest { stateStore.getApplicationHomeSubCluster(request); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL, - e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " does not exist")); } } @@ -397,8 +416,8 @@ public abstract class FederationStateStoreBaseTest { stateStore.updateApplicationHomeSubCluster((updateRequest)); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, e.getCode()); + Assert.assertTrue(e.getMessage() + .startsWith("Application " + appId.toString() + " does not exist")); } } @@ -458,8 +477,8 @@ public abstract class FederationStateStoreBaseTest { stateStore.getPolicyConfiguration(request); Assert.fail(); } catch (FederationStateStoreException e) { - Assert.assertEquals( - FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, e.getCode()); + Assert.assertTrue( + e.getMessage().startsWith("Policy for queue Queue does not exist")); } } @@ -499,8 +518,9 @@ public abstract class FederationStateStoreBaseTest { private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, String policyType) { - return SubClusterPolicyConfiguration.newInstance(queueName, policyType, - ByteBuffer.allocate(1)); + ByteBuffer bb = ByteBuffer.allocate(100); + bb.put((byte) 0x02); + return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb); } private void addApplicationHomeSC(ApplicationId appId, @@ -558,4 +578,8 @@ public abstract class FederationStateStoreBaseTest { this.conf = conf; } + protected Configuration getConf() { + return conf; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2180e46f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java new file mode 100644 index 0000000..289a3a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.sql.Connection; +import java.sql.SQLException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HSQLDB implementation of {@link FederationStateStore}. + */ +public class HSQLDBFederationStateStore extends SQLFederationStateStore { + + private static final Logger LOG = + LoggerFactory.getLogger(HSQLDBFederationStateStore.class); + + private Connection conn; + + private static final String TABLE_APPLICATIONSHOMESUBCLUSTER = + " CREATE TABLE applicationsHomeSubCluster (" + + " applicationId varchar(64) NOT NULL," + + " homeSubCluster varchar(256) NOT NULL," + + " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))"; + + private static final String TABLE_MEMBERSHIP = + "CREATE TABLE membership ( subClusterId varchar(256) NOT NULL," + + " amRMServiceAddress varchar(256) NOT NULL," + + " clientRMServiceAddress varchar(256) NOT NULL," + + " rmAdminServiceAddress varchar(256) NOT NULL," + + " rmWebServiceAddress varchar(256) NOT NULL," + + " lastHeartBeat datetime NOT NULL, state varchar(32) NOT NULL," + + " lastStartTime bigint NULL, capability varchar(6000) NOT NULL," + + " CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId))"; + + private static final String TABLE_POLICIES = + "CREATE TABLE policies ( queue varchar(256) NOT NULL," + + " policyType varchar(256) NOT NULL, params varbinary(512)," + + " CONSTRAINT pk_queue PRIMARY KEY (queue))"; + + private static final String SP_REGISTERSUBCLUSTER = + "CREATE PROCEDURE sp_registerSubCluster(" + + " IN subClusterId_IN varchar(256)," + + " IN amRMServiceAddress_IN varchar(256)," + + " IN clientRMServiceAddress_IN varchar(256)," + + " IN rmAdminServiceAddress_IN varchar(256)," + + " IN rmWebServiceAddress_IN varchar(256)," + + " IN state_IN varchar(256)," + + " IN lastStartTime_IN bigint, IN capability_IN varchar(6000)," + + " OUT rowCount_OUT int)MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM membership WHERE (subClusterId = subClusterId_IN);" + + " INSERT INTO membership ( subClusterId," + + " amRMServiceAddress, clientRMServiceAddress," + + " rmAdminServiceAddress, rmWebServiceAddress," + + " lastHeartBeat, state, lastStartTime," + + " capability) VALUES ( subClusterId_IN," + + " amRMServiceAddress_IN, clientRMServiceAddress_IN," + + " rmAdminServiceAddress_IN, rmWebServiceAddress_IN," + + " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE," + + " state_IN, lastStartTime_IN, capability_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_DEREGISTERSUBCLUSTER = + "CREATE PROCEDURE sp_deregisterSubCluster(" + + " IN subClusterId_IN varchar(256)," + + " IN state_IN varchar(64), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " UPDATE membership SET state = state_IN WHERE (" + + " subClusterId = subClusterId_IN AND state != state_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_SUBCLUSTERHEARTBEAT = + "CREATE PROCEDURE sp_subClusterHeartbeat(" + + " IN subClusterId_IN varchar(256), IN state_IN varchar(64)," + + " IN capability_IN varchar(6000), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC UPDATE membership" + + " SET capability = capability_IN, state = state_IN," + + " lastHeartBeat = NOW() AT TIME ZONE INTERVAL '0:00'" + + " HOUR TO MINUTE WHERE subClusterId = subClusterId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_GETSUBCLUSTER = + "CREATE PROCEDURE sp_getSubCluster( IN subClusterId_IN varchar(256)," + + " OUT amRMServiceAddress_OUT varchar(256)," + + " OUT clientRMServiceAddress_OUT varchar(256)," + + " OUT rmAdminServiceAddress_OUT varchar(256)," + + " OUT rmWebServiceAddress_OUT varchar(256)," + + " OUT lastHeartBeat_OUT datetime, OUT state_OUT varchar(64)," + + " OUT lastStartTime_OUT bigint," + + " OUT capability_OUT varchar(6000))" + + " MODIFIES SQL DATA BEGIN ATOMIC SELECT amRMServiceAddress," + + " clientRMServiceAddress," + + " rmAdminServiceAddress, rmWebServiceAddress," + + " lastHeartBeat, state, lastStartTime, capability" + + " INTO amRMServiceAddress_OUT, clientRMServiceAddress_OUT," + + " rmAdminServiceAddress_OUT," + + " rmWebServiceAddress_OUT, lastHeartBeat_OUT," + + " state_OUT, lastStartTime_OUT, capability_OUT" + + " FROM membership WHERE subClusterId = subClusterId_IN; END"; + + private static final String SP_GETSUBCLUSTERS = + "CREATE PROCEDURE sp_getSubClusters()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT subClusterId, amRMServiceAddress, clientRMServiceAddress," + + " rmAdminServiceAddress, rmWebServiceAddress, lastHeartBeat," + + " state, lastStartTime, capability" + + " FROM membership; OPEN result; END"; + + private static final String SP_ADDAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_addApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64)," + + " IN homeSubCluster_IN varchar(256)," + + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " INSERT INTO applicationsHomeSubCluster " + + " (applicationId,homeSubCluster) " + + " (SELECT applicationId_IN, homeSubCluster_IN" + + " FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationId_IN" + + " HAVING COUNT(*) = 0 );" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;" + + " SELECT homeSubCluster INTO storedHomeSubCluster_OUT" + + " FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationID_IN; END"; + + private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_updateApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64)," + + " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " UPDATE applicationsHomeSubCluster" + + " SET homeSubCluster = homeSubCluster_IN" + + " WHERE applicationId = applicationId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_GETAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_getApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64)," + + " OUT homeSubCluster_OUT varchar(256))" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " SELECT homeSubCluster INTO homeSubCluster_OUT" + + " FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationID_IN; END"; + + private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER = + "CREATE PROCEDURE sp_getApplicationsHomeSubCluster()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT applicationId, homeSubCluster" + + " FROM applicationsHomeSubCluster; OPEN result; END"; + + private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_SETPOLICYCONFIGURATION = + "CREATE PROCEDURE sp_setPolicyConfiguration(" + + " IN queue_IN varchar(256), IN policyType_IN varchar(256)," + + " IN params_IN varbinary(512), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM policies WHERE queue = queue_IN;" + + " INSERT INTO policies (queue, policyType, params)" + + " VALUES (queue_IN, policyType_IN, params_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_GETPOLICYCONFIGURATION = + "CREATE PROCEDURE sp_getPolicyConfiguration(" + + " IN queue_IN varchar(256), OUT policyType_OUT varchar(256)," + + " OUT params_OUT varbinary(512)) MODIFIES SQL DATA BEGIN ATOMIC" + + " SELECT policyType, params INTO policyType_OUT, params_OUT" + + " FROM policies WHERE queue = queue_IN; END"; + + private static final String SP_GETPOLICIESCONFIGURATIONS = + "CREATE PROCEDURE sp_getPoliciesConfigurations()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT * FROM policies; OPEN result; END"; + + @Override + public void init(Configuration conf) { + try { + super.init(conf); + } catch (YarnException e1) { + LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage()); + } + try { + conn = getConnection(); + + LOG.info("Database Init: Start"); + + conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute(); + conn.prepareStatement(TABLE_MEMBERSHIP).execute(); + conn.prepareStatement(TABLE_POLICIES).execute(); + + conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute(); + conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute(); + conn.prepareStatement(SP_SUBCLUSTERHEARTBEAT).execute(); + conn.prepareStatement(SP_GETSUBCLUSTER).execute(); + conn.prepareStatement(SP_GETSUBCLUSTERS).execute(); + + conn.prepareStatement(SP_ADDAPPLICATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_UPDATEAPPLICATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_GETAPPLICATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_GETAPPLICATIONSHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_DELETEAPPLICATIONHOMESUBCLUSTER).execute(); + + conn.prepareStatement(SP_SETPOLICYCONFIGURATION).execute(); + conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute(); + conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute(); + + LOG.info("Database Init: Complete"); + conn.close(); + } catch (SQLException e) { + LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage()); + } + } + + public void closeConnection() { + try { + conn.close(); + } catch (SQLException e) { + LOG.error( + "ERROR: failed to close connection to HSQLDB DB " + e.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2180e46f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index 64adab8..c29fc03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -28,7 +28,8 @@ public class TestMemoryFederationStateStore @Override protected FederationStateStore createStateStore() { - super.setConf(new Configuration()); + Configuration conf = new Configuration(); + super.setConf(conf); return new MemoryFederationStateStore(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2180e46f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java new file mode 100644 index 0000000..d4e6cc5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; + +/** + * Unit tests for SQLFederationStateStore. + */ +public class TestSQLFederationStateStore extends FederationStateStoreBaseTest { + + private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource"; + private static final String DATABASE_URL = "jdbc:hsqldb:mem:state"; + private static final String DATABASE_USERNAME = "SA"; + private static final String DATABASE_PASSWORD = ""; + + @Override + protected FederationStateStore createStateStore() { + + YarnConfiguration conf = new YarnConfiguration(); + + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS, + HSQLDB_DRIVER); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME, + DATABASE_USERNAME); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD, + DATABASE_PASSWORD); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL, + DATABASE_URL + System.currentTimeMillis()); + super.setConf(conf); + return new HSQLDBFederationStateStore(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2180e46f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java index 8ac5e81..5a5703e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java @@ -145,7 +145,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -155,7 +155,7 @@ public class TestFederationStateStoreInputValidator { try { SubClusterRegisterRequest request = null; FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -170,7 +170,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -188,7 +188,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -206,7 +206,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -224,7 +224,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -242,7 +242,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -257,7 +257,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -276,7 +276,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -294,7 +294,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -315,7 +315,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -332,7 +332,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -350,7 +350,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -368,7 +368,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -386,7 +386,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -404,7 +404,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -421,7 +421,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -438,7 +438,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -460,7 +460,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -477,7 +477,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -494,7 +494,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -510,7 +510,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -526,7 +526,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -543,7 +543,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -560,7 +560,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -576,7 +576,7 @@ public class TestFederationStateStoreInputValidator { SubClusterRegisterRequest request = SubClusterRegisterRequest.newInstance(subClusterInfo); FederationMembershipStateStoreInputValidator - .validateSubClusterRegisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -594,7 +594,7 @@ public class TestFederationStateStoreInputValidator { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterId, stateLost); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -604,7 +604,7 @@ public class TestFederationStateStoreInputValidator { try { SubClusterDeregisterRequest request = null; FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -618,7 +618,7 @@ public class TestFederationStateStoreInputValidator { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterIdNull, stateLost); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -632,7 +632,7 @@ public class TestFederationStateStoreInputValidator { SubClusterDeregisterRequest request = SubClusterDeregisterRequest .newInstance(subClusterIdInvalid, stateLost); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -646,7 +646,7 @@ public class TestFederationStateStoreInputValidator { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterId, stateNull); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -660,7 +660,7 @@ public class TestFederationStateStoreInputValidator { SubClusterDeregisterRequest request = SubClusterDeregisterRequest.newInstance(subClusterId, stateNew); FederationMembershipStateStoreInputValidator - .validateSubClusterDeregisterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -677,7 +677,7 @@ public class TestFederationStateStoreInputValidator { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -687,7 +687,7 @@ public class TestFederationStateStoreInputValidator { try { SubClusterHeartbeatRequest request = null; FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -701,7 +701,7 @@ public class TestFederationStateStoreInputValidator { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterIdNull, lastHeartBeat, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -716,7 +716,7 @@ public class TestFederationStateStoreInputValidator { SubClusterHeartbeatRequest.newInstance(subClusterIdInvalid, lastHeartBeat, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -730,7 +730,7 @@ public class TestFederationStateStoreInputValidator { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateNull, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -745,7 +745,7 @@ public class TestFederationStateStoreInputValidator { SubClusterHeartbeatRequest.newInstance(subClusterId, lastHeartBeatNegative, stateLost, capability); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -759,7 +759,7 @@ public class TestFederationStateStoreInputValidator { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityNull); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -773,7 +773,7 @@ public class TestFederationStateStoreInputValidator { SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityEmpty); FederationMembershipStateStoreInputValidator - .validateSubClusterHeartbeatRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -791,7 +791,7 @@ public class TestFederationStateStoreInputValidator { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterId); FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -801,7 +801,7 @@ public class TestFederationStateStoreInputValidator { try { GetSubClusterInfoRequest request = null; FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -815,7 +815,7 @@ public class TestFederationStateStoreInputValidator { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterIdNull); FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -829,7 +829,7 @@ public class TestFederationStateStoreInputValidator { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterIdInvalid); FederationMembershipStateStoreInputValidator - .validateGetSubClusterInfoRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -850,7 +850,7 @@ public class TestFederationStateStoreInputValidator { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -860,7 +860,7 @@ public class TestFederationStateStoreInputValidator { try { AddApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -875,7 +875,7 @@ public class TestFederationStateStoreInputValidator { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue( @@ -891,7 +891,7 @@ public class TestFederationStateStoreInputValidator { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -908,7 +908,7 @@ public class TestFederationStateStoreInputValidator { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -925,7 +925,7 @@ public class TestFederationStateStoreInputValidator { AddApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateAddApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -944,7 +944,7 @@ public class TestFederationStateStoreInputValidator { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -954,7 +954,7 @@ public class TestFederationStateStoreInputValidator { try { UpdateApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -969,7 +969,7 @@ public class TestFederationStateStoreInputValidator { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue( @@ -985,7 +985,7 @@ public class TestFederationStateStoreInputValidator { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -1002,7 +1002,7 @@ public class TestFederationStateStoreInputValidator { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { LOG.info(e.getMessage()); @@ -1019,7 +1019,7 @@ public class TestFederationStateStoreInputValidator { UpdateApplicationHomeSubClusterRequest .newInstance(applicationHomeSubCluster); FederationApplicationHomeSubClusterStoreInputValidator - .validateUpdateApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -1035,7 +1035,7 @@ public class TestFederationStateStoreInputValidator { GetApplicationHomeSubClusterRequest request = GetApplicationHomeSubClusterRequest.newInstance(appId); FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1045,7 +1045,7 @@ public class TestFederationStateStoreInputValidator { try { GetApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1058,7 +1058,7 @@ public class TestFederationStateStoreInputValidator { GetApplicationHomeSubClusterRequest request = GetApplicationHomeSubClusterRequest.newInstance(appIdNull); FederationApplicationHomeSubClusterStoreInputValidator - .validateGetApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -1075,7 +1075,7 @@ public class TestFederationStateStoreInputValidator { DeleteApplicationHomeSubClusterRequest request = DeleteApplicationHomeSubClusterRequest.newInstance(appId); FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1085,7 +1085,7 @@ public class TestFederationStateStoreInputValidator { try { DeleteApplicationHomeSubClusterRequest request = null; FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1098,7 +1098,7 @@ public class TestFederationStateStoreInputValidator { DeleteApplicationHomeSubClusterRequest request = DeleteApplicationHomeSubClusterRequest.newInstance(appIdNull); FederationApplicationHomeSubClusterStoreInputValidator - .validateDeleteApplicationHomeSubClusterRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Application Id.")); @@ -1115,7 +1115,7 @@ public class TestFederationStateStoreInputValidator { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance(queue); FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1125,7 +1125,7 @@ public class TestFederationStateStoreInputValidator { try { GetSubClusterPolicyConfigurationRequest request = null; FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1138,7 +1138,7 @@ public class TestFederationStateStoreInputValidator { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance(queueNull); FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1150,7 +1150,7 @@ public class TestFederationStateStoreInputValidator { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance(queueEmpty); FederationPolicyStoreInputValidator - .validateGetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1169,7 +1169,7 @@ public class TestFederationStateStoreInputValidator { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); } catch (FederationStateStoreInvalidInputException e) { Assert.fail(e.getMessage()); } @@ -1179,7 +1179,7 @@ public class TestFederationStateStoreInputValidator { try { SetSubClusterPolicyConfigurationRequest request = null; FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage() @@ -1193,7 +1193,7 @@ public class TestFederationStateStoreInputValidator { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue( @@ -1208,7 +1208,7 @@ public class TestFederationStateStoreInputValidator { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1222,7 +1222,7 @@ public class TestFederationStateStoreInputValidator { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Queue.")); @@ -1236,7 +1236,7 @@ public class TestFederationStateStoreInputValidator { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type.")); @@ -1250,7 +1250,7 @@ public class TestFederationStateStoreInputValidator { SetSubClusterPolicyConfigurationRequest request = SetSubClusterPolicyConfigurationRequest.newInstance(policy); FederationPolicyStoreInputValidator - .validateSetSubClusterPolicyConfigurationRequest(request); + .validate(request); Assert.fail(); } catch (FederationStateStoreInvalidInputException e) { Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type.")); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2180e46f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java index 632e865..304910e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java @@ -24,7 +24,6 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; @@ -82,10 +81,8 @@ public class TestFederationStateStoreFacadeRetry { conf = new Configuration(); conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries); RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf); - RetryAction action = policy.shouldRetry( - new FederationStateStoreException( - FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL), - 0, 0, false); + RetryAction action = policy + .shouldRetry(new FederationStateStoreException("Error"), 0, 0, false); Assert.assertEquals(RetryAction.FAIL.action, action.action); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2180e46f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql new file mode 100644 index 0000000..66d6f0e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql @@ -0,0 +1,511 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +USE [FederationStateStore] +GO + +IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_addApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @homeSubCluster VARCHAR(256), + @storedHomeSubCluster VARCHAR(256) OUTPUT, + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + -- If application to sub-cluster map doesn't exist, insert it. + -- Otherwise don't change the current mapping. + IF NOT EXISTS (SELECT TOP 1 * + FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationId) + + INSERT INTO [dbo].[applicationsHomeSubCluster] ( + [applicationId], + [homeSubCluster]) + VALUES ( + @applicationId, + @homeSubCluster); + -- End of the IF block + + SELECT @rowCount = @@ROWCOUNT; + + SELECT @storedHomeSubCluster = [homeSubCluster] + FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationId; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_updateApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_updateApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @homeSubCluster VARCHAR(256), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[applicationsHomeSubCluster] + SET [homeSubCluster] = @homeSubCluster + WHERE [applicationId] = @applicationid; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getApplicationsHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [applicationId], [homeSubCluster], [createTime] + FROM [dbo].[applicationsHomeSubCluster] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @homeSubCluster VARCHAR(256) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @homeSubCluster = [homeSubCluster] + FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationid; + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_deleteApplicationHomeSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_deleteApplicationHomeSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_deleteApplicationHomeSubCluster] + @applicationId VARCHAR(64), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[applicationsHomeSubCluster] + WHERE [applicationId] = @applicationId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_registerSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_registerSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_registerSubCluster] + @subClusterId VARCHAR(256), + @amRMServiceAddress VARCHAR(256), + @clientRMServiceAddress VARCHAR(256), + @rmAdminServiceAddress VARCHAR(256), + @rmWebServiceAddress VARCHAR(256), + @state VARCHAR(32), + @lastStartTime BIGINT, + @capability VARCHAR(6000), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[membership] + WHERE [subClusterId] = @subClusterId; + INSERT INTO [dbo].[membership] ( + [subClusterId], + [amRMServiceAddress], + [clientRMServiceAddress], + [rmAdminServiceAddress], + [rmWebServiceAddress], + [lastHeartBeat], + [state], + [lastStartTime], + [capability] ) + VALUES ( + @subClusterId, + @amRMServiceAddress, + @clientRMServiceAddress, + @rmAdminServiceAddress, + @rmWebServiceAddress, + GETUTCDATE(), + @state, + @lastStartTime, + @capability); + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getSubClusters]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getSubClusters]; +GO + +CREATE PROCEDURE [dbo].[sp_getSubClusters] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [subClusterId], [amRMServiceAddress], [clientRMServiceAddress], + [rmAdminServiceAddress], [rmWebServiceAddress], [lastHeartBeat], + [state], [lastStartTime], [capability] + FROM [dbo].[membership] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_getSubCluster] + @subClusterId VARCHAR(256), + @amRMServiceAddress VARCHAR(256) OUTPUT, + @clientRMServiceAddress VARCHAR(256) OUTPUT, + @rmAdminServiceAddress VARCHAR(256) OUTPUT, + @rmWebServiceAddress VARCHAR(256) OUTPUT, + @lastHeartbeat DATETIME2 OUTPUT, + @state VARCHAR(256) OUTPUT, + @lastStartTime BIGINT OUTPUT, + @capability VARCHAR(6000) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + SELECT @subClusterId = [subClusterId], + @amRMServiceAddress = [amRMServiceAddress], + @clientRMServiceAddress = [clientRMServiceAddress], + @rmAdminServiceAddress = [rmAdminServiceAddress], + @rmWebServiceAddress = [rmWebServiceAddress], + @lastHeartBeat = [lastHeartBeat], + @state = [state], + @lastStartTime = [lastStartTime], + @capability = [capability] + FROM [dbo].[membership] + WHERE [subClusterId] = @subClusterId + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + + +IF OBJECT_ID ( '[sp_subClusterHeartbeat]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_subClusterHeartbeat]; +GO + +CREATE PROCEDURE [dbo].[sp_subClusterHeartbeat] + @subClusterId VARCHAR(256), + @state VARCHAR(256), + @capability VARCHAR(6000), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[membership] + SET [state] = @state, + [lastHeartbeat] = GETUTCDATE(), + [capability] = @capability + WHERE [subClusterId] = @subClusterId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_deregisterSubCluster]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_deregisterSubCluster]; +GO + +CREATE PROCEDURE [dbo].[sp_deregisterSubCluster] + @subClusterId VARCHAR(256), + @state VARCHAR(256), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + UPDATE [dbo].[membership] + SET [state] = @state + WHERE [subClusterId] = @subClusterId; + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_setPolicyConfiguration]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_setPolicyConfiguration]; +GO + +CREATE PROCEDURE [dbo].[sp_setPolicyConfiguration] + @queue VARCHAR(256), + @policyType VARCHAR(256), + @params VARBINARY(512), + @rowCount int OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + BEGIN TRAN + + DELETE FROM [dbo].[policies] + WHERE [queue] = @queue; + INSERT INTO [dbo].[policies] ( + [queue], + [policyType], + [params]) + VALUES ( + @queue, + @policyType, + @params); + SELECT @rowCount = @@ROWCOUNT; + + COMMIT TRAN + END TRY + + BEGIN CATCH + ROLLBACK TRAN + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getPolicyConfiguration]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getPolicyConfiguration]; +GO + +CREATE PROCEDURE [dbo].[sp_getPolicyConfiguration] + @queue VARCHAR(256), + @policyType VARCHAR(256) OUTPUT, + @params VARBINARY(6000) OUTPUT +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + + SELECT @policyType = [policyType], + @params = [params] + FROM [dbo].[policies] + WHERE [queue] = @queue + + END TRY + + BEGIN CATCH + + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO + +IF OBJECT_ID ( '[sp_getPoliciesConfigurations]', 'P' ) IS NOT NULL + DROP PROCEDURE [sp_getPoliciesConfigurations]; +GO + +CREATE PROCEDURE [dbo].[sp_getPoliciesConfigurations] +AS BEGIN + DECLARE @errorMessage nvarchar(4000) + + BEGIN TRY + SELECT [queue], [policyType], [params] FROM [dbo].[policies] + END TRY + + BEGIN CATCH + SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE()) + + /* raise error and terminate the execution */ + RAISERROR(@errorMessage, --- Error Message + 1, -- Severity + -1 -- State + ) WITH log + END CATCH +END; +GO \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2180e46f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql new file mode 100644 index 0000000..a97385b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +USE [FederationStateStore] +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'applicationsHomeSubCluster' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table applicationsHomeSubCluster does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[applicationsHomeSubCluster]( + applicationId VARCHAR(64) COLLATE Latin1_General_100_BIN2 NOT NULL, + homeSubCluster VARCHAR(256) NOT NULL, + createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(), + + CONSTRAINT [pk_applicationId] PRIMARY KEY + ( + [applicationId] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table applicationsHomeSubCluster created.' + END +ELSE + PRINT 'Table applicationsHomeSubCluster exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'membership' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table membership does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[membership]( + [subClusterId] VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL, + [amRMServiceAddress] VARCHAR(256) NOT NULL, + [clientRMServiceAddress] VARCHAR(256) NOT NULL, + [rmAdminServiceAddress] VARCHAR(256) NOT NULL, + [rmWebServiceAddress] VARCHAR(256) NOT NULL, + [lastHeartBeat] DATETIME2 NOT NULL, + [state] VARCHAR(32) NOT NULL, + [lastStartTime] BIGINT NOT NULL, + [capability] VARCHAR(6000) NOT NULL, + + CONSTRAINT [pk_subClusterId] PRIMARY KEY + ( + [subClusterId] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table membership created.' + END +ELSE + PRINT 'Table membership exists, no operation required...' + GO +GO + +IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables + WHERE name = 'policies' + AND schema_id = SCHEMA_ID('dbo')) + BEGIN + PRINT 'Table policies does not exist, create it...' + + SET ANSI_NULLS ON + + SET QUOTED_IDENTIFIER ON + + SET ANSI_PADDING ON + + CREATE TABLE [dbo].[policies]( + queue VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL, + policyType VARCHAR(256) NOT NULL, + params VARBINARY(6000) NOT NULL, + + CONSTRAINT [pk_queue] PRIMARY KEY + ( + [queue] + ) + ) + + SET ANSI_PADDING OFF + + PRINT 'Table policies created.' + END +ELSE + PRINT 'Table policies exists, no operation required...' + GO +GO --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org