Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 18C62200D22 for ; Sat, 7 Oct 2017 04:06:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 17677160BE1; Sat, 7 Oct 2017 02:06:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B5B8B1609E1 for ; Sat, 7 Oct 2017 04:06:11 +0200 (CEST) Received: (qmail 82818 invoked by uid 500); 7 Oct 2017 02:06:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 79555 invoked by uid 99); 7 Oct 2017 02:06:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 07 Oct 2017 02:06:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 57017F5D15; Sat, 7 Oct 2017 02:06:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Sat, 07 Oct 2017 02:06:04 -0000 Message-Id: <916eeb2b9013463c9582bd448ce7b78a@git.apache.org> In-Reply-To: <0cd2adcb5e6d4e34b8c054b3370736fe@git.apache.org> References: <0cd2adcb5e6d4e34b8c054b3370736fe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/31] hadoop git commit: HDFS-10882. Federation State Store Interface API. Contributed by Jason Kace and Inigo Goiri. archived-at: Sat, 07 Oct 2017 02:06:13 -0000 HDFS-10882. Federation State Store Interface API. Contributed by Jason Kace and Inigo Goiri. (cherry picked from commit 6d94c90ece1c1d23d4c97e72c54e9991f5dbc481) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2c740a68 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2c740a68 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2c740a68 Branch: refs/heads/trunk Commit: 2c740a684a23663962119726bf0e7ecef173f6f1 Parents: 533b986 Author: Inigo Authored: Thu Apr 6 19:18:52 2017 -0700 Committer: Inigo Goiri Committed: Fri Oct 6 18:50:45 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 ++ .../server/federation/store/RecordStore.java | 100 ++++++++++++++++ .../store/driver/StateStoreSerializer.java | 119 +++++++++++++++++++ .../driver/impl/StateStoreSerializerPBImpl.java | 115 ++++++++++++++++++ .../store/records/impl/pb/PBRecord.java | 47 ++++++++ .../store/records/impl/pb/package-info.java | 29 +++++ .../src/main/resources/hdfs-default.xml | 8 ++ 7 files changed, 429 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c740a68/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 92cb0ec..c485ea6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; import org.apache.hadoop.http.HttpConfig; /** @@ -1129,6 +1130,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT = "org.apache.hadoop.hdfs.server.federation.MockResolver"; + // HDFS Router-based federation State Store + public static final String FEDERATION_STORE_PREFIX = + FEDERATION_ROUTER_PREFIX + "store."; + + public static final String FEDERATION_STORE_SERIALIZER_CLASS = + DFSConfigKeys.FEDERATION_STORE_PREFIX + "serializer"; + public static final Class + FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT = + StateStoreSerializerPBImpl.class; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c740a68/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java new file mode 100644 index 0000000..524f432 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; + +/** + * Store records in the State Store. Subclasses provide interfaces to operate on + * those records. + * + * @param Record to store by this interface. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RecordStore { + + private static final Log LOG = LogFactory.getLog(RecordStore.class); + + + /** Class of the record stored in this State Store. */ + private final Class recordClass; + + /** State store driver backed by persistent storage. */ + private final StateStoreDriver driver; + + + /** + * Create a new store for records. + * + * @param clazz Class of the record to store. + * @param stateStoreDriver Driver for the State Store. + */ + protected RecordStore(Class clazz, StateStoreDriver stateStoreDriver) { + this.recordClass = clazz; + this.driver = stateStoreDriver; + } + + /** + * Report a required record to the data store. The data store uses this to + * create/maintain storage for the record. + * + * @return The class of the required record or null if no record is required + * for this interface. + */ + public Class getRecordClass() { + return this.recordClass; + } + + /** + * Get the State Store driver. + * + * @return State Store driver. + */ + protected StateStoreDriver getDriver() { + return this.driver; + } + + /** + * Build a state store API implementation interface. + * + * @param interfaceClass The specific interface implementation to create + * @param driver The {@link StateStoreDriver} implementation in use. + * @return An initialized instance of the specified state store API + * implementation. + */ + public static > T newInstance( + final Class clazz, final StateStoreDriver driver) { + + try { + Constructor constructor = clazz.getConstructor(StateStoreDriver.class); + T recordStore = constructor.newInstance(driver); + return recordStore; + } catch (Exception e) { + LOG.error("Cannot create new instance for " + clazz, e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c740a68/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java new file mode 100644 index 0000000..8540405 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Serializer to store and retrieve data in the State Store. + */ +public abstract class StateStoreSerializer { + + /** Singleton for the serializer instance. */ + private static StateStoreSerializer defaultSerializer; + + /** + * Get the default serializer based. + * @return Singleton serializer. + */ + public static StateStoreSerializer getSerializer() { + return getSerializer(null); + } + + /** + * Get a serializer based on the provided configuration. + * @param conf Configuration. Default if null. + * @return Singleton serializer. + */ + public static StateStoreSerializer getSerializer(Configuration conf) { + if (conf == null) { + synchronized (StateStoreSerializer.class) { + if (defaultSerializer == null) { + conf = new Configuration(); + defaultSerializer = newSerializer(conf); + } + } + return defaultSerializer; + } else { + return newSerializer(conf); + } + } + + private static StateStoreSerializer newSerializer(final Configuration conf) { + Class serializerName = conf.getClass( + DFSConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS, + DFSConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT, + StateStoreSerializer.class); + return ReflectionUtils.newInstance(serializerName, conf); + } + + /** + * Create a new record. + * @param clazz Class of the new record. + * @return New record. + */ + public static T newRecord(Class clazz) { + return getSerializer(null).newRecordInstance(clazz); + } + + /** + * Create a new record. + * @param clazz Class of the new record. + * @return New record. + */ + public abstract T newRecordInstance(Class clazz); + + /** + * Serialize a record into a byte array. + * @param record Record to serialize. + * @return Byte array with the serialized record. + */ + public abstract byte[] serialize(BaseRecord record); + + /** + * Serialize a record into a string. + * @param record Record to serialize. + * @return String with the serialized record. + */ + public abstract String serializeString(BaseRecord record); + + /** + * Deserialize a bytes array into a record. + * @param byteArray Byte array to deserialize. + * @param clazz Class of the record. + * @return New record. + * @throws IOException If it cannot deserialize the record. + */ + public abstract T deserialize( + byte[] byteArray, Class clazz) throws IOException; + + /** + * Deserialize a string into a record. + * @param data String with the data to deserialize. + * @param clazz Class of the record. + * @return New record. + * @throws IOException If it cannot deserialize the record. + */ + public abstract T deserialize( + String data, Class clazz) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c740a68/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java new file mode 100644 index 0000000..45c5dd6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.IOException; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.binary.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the State Store serializer. + */ +public final class StateStoreSerializerPBImpl extends StateStoreSerializer { + + private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb"; + private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl"; + + private Configuration localConf = new Configuration(); + + + private StateStoreSerializerPBImpl() { + } + + @Override + @SuppressWarnings("unchecked") + public T newRecordInstance(Class clazz) { + try { + String clazzPBImpl = getPBImplClassName(clazz); + Class pbClazz = localConf.getClassByName(clazzPBImpl); + Object retObject = ReflectionUtils.newInstance(pbClazz, localConf); + return (T)retObject; + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + private String getPBImplClassName(Class clazz) { + String srcPackagePart = getPackageName(clazz); + String srcClassName = getClassName(clazz); + String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX; + String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX; + return destPackagePart + "." + destClassPart; + } + + private String getClassName(Class clazz) { + String fqName = clazz.getName(); + return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length())); + } + + private String getPackageName(Class clazz) { + return clazz.getPackage().getName(); + } + + @Override + public byte[] serialize(BaseRecord record) { + byte[] byteArray64 = null; + if (record instanceof PBRecord) { + PBRecord recordPB = (PBRecord) record; + Message msg = recordPB.getProto(); + byte[] byteArray = msg.toByteArray(); + byteArray64 = Base64.encodeBase64(byteArray, false); + } + return byteArray64; + } + + @Override + public String serializeString(BaseRecord record) { + byte[] byteArray64 = serialize(record); + String base64Encoded = StringUtils.newStringUtf8(byteArray64); + return base64Encoded; + } + + @Override + public T deserialize( + byte[] byteArray, Class clazz) throws IOException { + + T record = newRecord(clazz); + if (record instanceof PBRecord) { + PBRecord pbRecord = (PBRecord) record; + byte[] byteArray64 = Base64.encodeBase64(byteArray, false); + String base64Encoded = StringUtils.newStringUtf8(byteArray64); + pbRecord.readInstance(base64Encoded); + } + return record; + } + + @Override + public T deserialize(String data, Class clazz) + throws IOException { + byte[] byteArray64 = Base64.decodeBase64(data); + return deserialize(byteArray64, clazz); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c740a68/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java new file mode 100644 index 0000000..c369275 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/PBRecord.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb; + +import java.io.IOException; + +import com.google.protobuf.Message; + +/** + * A record implementation using Protobuf. + */ +public interface PBRecord { + + /** + * Get the protocol for the record. + * @return The protocol for this record. + */ + Message getProto(); + + /** + * Set the protocol for the record. + * @param proto Protocol for this record. + */ + void setProto(Message proto); + + /** + * Populate this record with serialized data. + * @param base64String Serialized data in base64. + * @throws IOException If it cannot read the data. + */ + void readInstance(String base64String) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c740a68/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java new file mode 100644 index 0000000..b329732 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/package-info.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The protobuf implementations of state store data records defined in the + * org.apache.hadoop.hdfs.server.federation.store.records package. Each + * implementation wraps an associated protobuf proto definition. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c740a68/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 36a9b2e..67595d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4666,4 +4666,12 @@ + + dfs.federation.router.store.serializer + org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl + + Class to serialize State Store records. + + + --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org