Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 596A2200D12 for ; Fri, 22 Sep 2017 20:27:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 564CC1609E8; Fri, 22 Sep 2017 18:27:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 589801609D0 for ; Fri, 22 Sep 2017 20:27:08 +0200 (CEST) Received: (qmail 96585 invoked by uid 500); 22 Sep 2017 18:27:02 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 95865 invoked by uid 99); 22 Sep 2017 18:27:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Sep 2017 18:27:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1A32F5AC9; Fri, 22 Sep 2017 18:27:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhung@apache.org To: common-commits@hadoop.apache.org Date: Fri, 22 Sep 2017 18:27:16 -0000 Message-Id: <1075dadf548e42f4ac9c66b26be987d0@git.apache.org> In-Reply-To: <53e2fc90c3594a03b5a27004d84fcb29@git.apache.org> References: <53e2fc90c3594a03b5a27004d84fcb29@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/30] hadoop git commit: YARN-5946: Create YarnConfigurationStore interface and InMemoryConfigurationStore class. Contributed by Jonathan Hung archived-at: Fri, 22 Sep 2017 18:27:09 -0000 YARN-5946: Create YarnConfigurationStore interface and InMemoryConfigurationStore class. Contributed by Jonathan Hung Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/70275b53 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/70275b53 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/70275b53 Branch: refs/heads/YARN-5734 Commit: 70275b532c901a9b4b471f8c4efef62ba500a586 Parents: b53ac13 Author: Xuan Authored: Fri Feb 24 15:58:12 2017 -0800 Committer: Jonathan Hung Committed: Fri Sep 22 11:26:29 2017 -0700 ---------------------------------------------------------------------- .../conf/InMemoryConfigurationStore.java | 86 +++++++++++ .../capacity/conf/YarnConfigurationStore.java | 154 +++++++++++++++++++ .../conf/TestYarnConfigurationStore.java | 70 +++++++++ 3 files changed, 310 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/70275b53/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java new file mode 100644 index 0000000..a208fb9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * A default implementation of {@link YarnConfigurationStore}. Doesn't offer + * persistent configuration storage, just stores the configuration in memory. + */ +public class InMemoryConfigurationStore implements YarnConfigurationStore { + + private Configuration schedConf; + private LinkedList pendingMutations; + private long pendingId; + + @Override + public void initialize(Configuration conf, Configuration schedConf) { + this.schedConf = schedConf; + this.pendingMutations = new LinkedList<>(); + this.pendingId = 0; + } + + @Override + public synchronized long logMutation(LogMutation logMutation) { + logMutation.setId(++pendingId); + pendingMutations.add(logMutation); + return pendingId; + } + + @Override + public synchronized boolean confirmMutation(long id, boolean isValid) { + LogMutation mutation = pendingMutations.poll(); + // If confirmMutation is called out of order, discard mutations until id + // is reached. + while (mutation != null) { + if (mutation.getId() == id) { + if (isValid) { + Map mutations = mutation.getUpdates(); + for (Map.Entry kv : mutations.entrySet()) { + schedConf.set(kv.getKey(), kv.getValue()); + } + } + return true; + } + mutation = pendingMutations.poll(); + } + return false; + } + + @Override + public synchronized Configuration retrieve() { + return schedConf; + } + + @Override + public synchronized List getPendingMutations() { + return pendingMutations; + } + + @Override + public List getConfirmedConfHistory(long fromId) { + // Unimplemented. + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/70275b53/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java new file mode 100644 index 0000000..22c0ef8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; + +import java.util.List; +import java.util.Map; + +/** + * YarnConfigurationStore exposes the methods needed for retrieving and + * persisting {@link CapacityScheduler} configuration via key-value + * using write-ahead logging. When configuration mutation is requested, caller + * should first log it with {@code logMutation}, which persists this pending + * mutation. This mutation is merged to the persisted configuration only after + * {@code confirmMutation} is called. + * + * On startup/recovery, caller should call {@code retrieve} to get all + * confirmed mutations, then get pending mutations which were not confirmed via + * {@code getPendingMutations}, and replay/confirm them via + * {@code confirmMutation} as in the normal case. + */ +public interface YarnConfigurationStore { + + /** + * LogMutation encapsulates the fields needed for configuration mutation + * audit logging and recovery. + */ + class LogMutation { + private Map updates; + private String user; + private long id; + + /** + * Create log mutation prior to logging. + * @param updates key-value configuration updates + * @param user user who requested configuration change + */ + public LogMutation(Map updates, String user) { + this(updates, user, 0); + } + + /** + * Create log mutation for recovery. + * @param updates key-value configuration updates + * @param user user who requested configuration change + * @param id transaction id of configuration change + */ + LogMutation(Map updates, String user, long id) { + this.updates = updates; + this.user = user; + this.id = id; + } + + /** + * Get key-value configuration updates. + * @return map of configuration updates + */ + public Map getUpdates() { + return updates; + } + + /** + * Get user who requested configuration change. + * @return user who requested configuration change + */ + public String getUser() { + return user; + } + + /** + * Get transaction id of this configuration change. + * @return transaction id + */ + public long getId() { + return id; + } + + /** + * Set transaction id of this configuration change. + * @param id transaction id + */ + public void setId(long id) { + this.id = id; + } + } + + /** + * Initialize the configuration store. + * @param conf configuration to initialize store with + * @param schedConf Initial key-value configuration to persist + */ + void initialize(Configuration conf, Configuration schedConf); + + /** + * Logs the configuration change to backing store. Generates an id associated + * with this mutation, sets it in {@code logMutation}, and returns it. + * @param logMutation configuration change to be persisted in write ahead log + * @return id which configuration store associates with this mutation + */ + long logMutation(LogMutation logMutation); + + /** + * Should be called after {@code logMutation}. Gets the pending mutation + * associated with {@code id} and marks the mutation as persisted (no longer + * pending). If isValid is true, merge the mutation with the persisted + * configuration. + * + * If {@code confirmMutation} is called with ids in a different order than + * was returned by {@code logMutation}, the result is implementation + * dependent. + * @param id id of mutation to be confirmed + * @param isValid if true, update persisted configuration with mutation + * associated with {@code id}. + * @return true on success + */ + boolean confirmMutation(long id, boolean isValid); + + /** + * Retrieve the persisted configuration. + * @return configuration as key-value + */ + Configuration retrieve(); + + /** + * Get the list of pending mutations, in the order they were logged. + * @return list of mutations + */ + List getPendingMutations(); + + /** + * Get a list of confirmed configuration mutations starting from a given id. + * @param fromId id from which to start getting mutations, inclusive + * @return list of configuration mutations + */ + List getConfirmedConfHistory(long fromId); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/70275b53/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java new file mode 100644 index 0000000..dff4e77 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestYarnConfigurationStore { + + private YarnConfigurationStore confStore; + private Configuration schedConf; + + private static final String testUser = "testUser"; + + @Before + public void setUp() { + schedConf = new Configuration(false); + schedConf.set("key1", "val1"); + } + + @Test + public void testInMemoryConfigurationStore() { + confStore = new InMemoryConfigurationStore(); + confStore.initialize(new Configuration(), schedConf); + assertEquals("val1", confStore.retrieve().get("key1")); + + Map update1 = new HashMap<>(); + update1.put("keyUpdate1", "valUpdate1"); + LogMutation mutation1 = new LogMutation(update1, testUser); + long id = confStore.logMutation(mutation1); + assertEquals(1, confStore.getPendingMutations().size()); + confStore.confirmMutation(id, true); + assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); + assertEquals(0, confStore.getPendingMutations().size()); + + Map update2 = new HashMap<>(); + update2.put("keyUpdate2", "valUpdate2"); + LogMutation mutation2 = new LogMutation(update2, testUser); + id = confStore.logMutation(mutation2); + assertEquals(1, confStore.getPendingMutations().size()); + confStore.confirmMutation(id, false); + assertNull("Configuration should not be updated", + confStore.retrieve().get("keyUpdate2")); + assertEquals(0, confStore.getPendingMutations().size()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org