From hcatalog-commits-return-776-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Fri Mar 23 18:15:08 2012 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F01DF981B for ; Fri, 23 Mar 2012 18:15:08 +0000 (UTC) Received: (qmail 90460 invoked by uid 500); 23 Mar 2012 18:15:08 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 90400 invoked by uid 500); 23 Mar 2012 18:15:08 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 90382 invoked by uid 99); 23 Mar 2012 18:15:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Mar 2012 18:15:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Mar 2012 18:14:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9ACD4238890B; Fri, 23 Mar 2012 18:14:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1304536 - in /incubator/hcatalog/trunk: ./ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ storage-ha... Date: Fri, 23 Mar 2012 18:14:37 -0000 To: hcatalog-commits@incubator.apache.org From: toffer@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120323181437.9ACD4238890B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: toffer Date: Fri Mar 23 18:14:36 2012 New Revision: 1304536 URL: http://svn.apache.org/viewvc?rev=1304536&view=rev Log: HCATALOG-310 Turn current RM implementation into HBase Coprocessor (thw via toffer) Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java Modified: incubator/hcatalog/trunk/CHANGES.txt incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java Modified: incubator/hcatalog/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1304536&r1=1304535&r2=1304536&view=diff ============================================================================== --- incubator/hcatalog/trunk/CHANGES.txt (original) +++ incubator/hcatalog/trunk/CHANGES.txt Fri Mar 23 18:14:36 2012 @@ -62,6 +62,8 @@ Release 0.4.0 - Unreleased HCAT-240. Changes to HCatOutputFormat to make it use SerDes instead of StorageDriver (toffer) NEW FEATURES + HCAT-310 Turn current RM implementation into HBase Coprocessor (thw via toffer) + HCAT-334 HCatalog should generate a POM file so it can be deployed to a maven repo (traviscrawford via gates) HCAT-296 Hcatalog should be able talk to secure hbase server using hbase delegation token (rohini via toffer) Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java?rev=1304536&r1=1304535&r2=1304536&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java Fri Mar 23 18:14:36 2012 @@ -24,13 +24,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hive.hbase.HBaseSerDe; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; @@ -40,7 +38,6 @@ import org.apache.hcatalog.hbase.snapsho import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory; import org.apache.hcatalog.hbase.snapshot.TableSnapshot; import org.apache.hcatalog.hbase.snapshot.Transaction; -import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager; import org.apache.hcatalog.mapreduce.HCatTableInfo; import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.hcatalog.mapreduce.OutputJobInfo; @@ -126,37 +123,7 @@ class HBaseRevisionManagerUtil { * @throws IOException */ static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException { - - Properties properties = new Properties(); - String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM); - int port = jobConf.getInt("hbase.zookeeper.property.clientPort", - HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); - - if (zkHostList != null) { - String[] splits = zkHostList.split(","); - StringBuffer sb = new StringBuffer(); - for (String split : splits) { - sb.append(split); - sb.append(':'); - sb.append(port); - sb.append(','); - } - - sb.deleteCharAt(sb.length() - 1); - properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString()); - } - String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR); - if (dataDir != null) { - properties.put(ZKBasedRevisionManager.DATADIR, dataDir); - } - String rmClassName = jobConf.get( - RevisionManager.REVISION_MGR_IMPL_CLASS, - ZKBasedRevisionManager.class.getName()); - properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName); - RevisionManager revisionManger = RevisionManagerFactory - .getRevisionManager(properties); - revisionManger.open(); - return revisionManger; + return RevisionManagerFactory.getOpenedRevisionManager(jobConf); } static void closeRevisionManagerQuietly(RevisionManager rm) { Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1304536&r1=1304535&r2=1304536&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Fri Mar 23 18:14:36 2012 @@ -25,8 +25,11 @@ import java.util.Properties; * This interface provides APIs for implementing revision management. */ public interface RevisionManager { - - public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class"; + /** + * Version property required by HBase to use this interface + * for CoprocessorProtocol / RPC. + */ + public static final long VERSION = 1L; // do not change /** * Initialize the revision manager. Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java?rev=1304536&view=auto ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java (added) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java Fri Mar 23 18:14:36 2012 @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of RevisionManager as HBase RPC endpoint. This class will control the lifecycle of + * and delegate to the actual RevisionManager implementation and make it available as a service + * hosted in the HBase region server (instead of running it in the client (storage handler). + * In the case of {@link ZKBasedRevisionManager} now only the region servers need write access to + * manage revision data. + */ +public class RevisionManagerEndpoint extends BaseEndpointCoprocessor implements RevisionManagerProtocol { + + private static final Logger LOGGER = + LoggerFactory.getLogger(RevisionManagerEndpoint.class.getName()); + public static final String REVISION_MGR_ENDPOINT_IMPL_CLASS = "revision.manager.endpoint.impl.class"; + + private RevisionManager rmImpl = null; + + @Override + public void start(CoprocessorEnvironment env) { + super.start(env); + try { + Configuration conf = env.getConfiguration(); + String className = conf.get(REVISION_MGR_ENDPOINT_IMPL_CLASS, + ZKBasedRevisionManager.class.getName()); + rmImpl = RevisionManagerFactory.getOpenedRevisionManager(className, conf); + } catch (IOException e) { + LOGGER.error("Failed to initialize revision manager", e); + } + } + + @Override + public void stop(CoprocessorEnvironment env) { + if (rmImpl != null) { + try { + rmImpl.close(); + } catch (IOException e) { + LOGGER.warn("Error closing revision manager.", e); + } + } + super.stop(env); + } + + @Override + public void initialize(Properties properties) { + // do nothing, HBase controls life cycle + } + + @Override + public void open() throws IOException { + // do nothing, HBase controls life cycle + } + + @Override + public void close() throws IOException { + // do nothing, HBase controls life cycle + } + + @Override + public Transaction beginWriteTransaction(String table, List families) + throws IOException { + return rmImpl.beginWriteTransaction(table, families); + } + + @Override + public Transaction beginWriteTransaction(String table, + List families, long keepAlive) throws IOException { + return rmImpl.beginWriteTransaction(table, families, keepAlive); + } + + @Override + public void commitWriteTransaction(Transaction transaction) + throws IOException { + rmImpl.commitWriteTransaction(transaction); + } + + @Override + public void abortWriteTransaction(Transaction transaction) + throws IOException { + rmImpl.abortWriteTransaction(transaction); + } + + @Override + public TableSnapshot createSnapshot(String tableName) throws IOException { + return rmImpl.createSnapshot(tableName); + } + + @Override + public TableSnapshot createSnapshot(String tableName, long revision) + throws IOException { + return rmImpl.createSnapshot(tableName, revision); + } + + @Override + public void keepAlive(Transaction transaction) throws IOException { + rmImpl.keepAlive(transaction); + } + + @Override + public List getAbortedWriteTransactions(String table, + String columnFamily) throws IOException { + return rmImpl.getAbortedWriteTransactions(table, columnFamily); + } + +} Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java?rev=1304536&view=auto ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java (added) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java Fri Mar 23 18:14:36 2012 @@ -0,0 +1,97 @@ +package org.apache.hcatalog.hbase.snapshot; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This class is nothing but a delegate for the enclosed proxy, + * which is created upon setting the configuration. + */ +public class RevisionManagerEndpointClient implements RevisionManager, Configurable { + + private Configuration conf = null; + private RevisionManager rmProxy; + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration arg0) { + this.conf = arg0; + } + + @Override + public void initialize(Properties properties) { + // do nothing + } + + @Override + public void open() throws IOException { + // clone to adjust RPC settings unique to proxy + Configuration clonedConf = new Configuration(conf); + // conf.set("hbase.ipc.client.connect.max.retries", "0"); + // conf.setInt(HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, 1); + clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // do not retry RPC + HTable table = new HTable(clonedConf, HConstants.META_TABLE_NAME); + rmProxy = table.coprocessorProxy(RevisionManagerProtocol.class, + Bytes.toBytes("anyRow")); + rmProxy.open(); + } + + @Override + public void close() throws IOException { + rmProxy.close(); + } + + @Override + public Transaction beginWriteTransaction(String table, List families) throws IOException { + return rmProxy.beginWriteTransaction(table, families); + } + + @Override + public Transaction beginWriteTransaction(String table, List families, long keepAlive) + throws IOException { + return rmProxy.beginWriteTransaction(table, families, keepAlive); + } + + @Override + public void commitWriteTransaction(Transaction transaction) throws IOException { + rmProxy.commitWriteTransaction(transaction); + } + + @Override + public void abortWriteTransaction(Transaction transaction) throws IOException { + rmProxy.abortWriteTransaction(transaction); + } + + @Override + public List getAbortedWriteTransactions(String table, String columnFamily) + throws IOException { + return rmProxy.getAbortedWriteTransactions(table, columnFamily); + } + + @Override + public TableSnapshot createSnapshot(String tableName) throws IOException { + return rmProxy.createSnapshot(tableName); + } + + @Override + public TableSnapshot createSnapshot(String tableName, long revision) throws IOException { + return rmProxy.createSnapshot(tableName, revision); + } + + @Override + public void keepAlive(Transaction transaction) throws IOException { + rmProxy.keepAlive(transaction); + } + +} Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java?rev=1304536&r1=1304535&r2=1304536&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java Fri Mar 23 18:14:36 2012 @@ -20,16 +20,26 @@ package org.apache.hcatalog.hbase.snapsh import java.io.IOException; import java.util.Properties; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; + +/** + * Utility to instantiate the revision manager (not a true factory actually). + * Depends on HBase configuration to resolve ZooKeeper connection (when ZK is used). + */ public class RevisionManagerFactory { - /** - * Gets an instance of revision manager. - * - * @param properties The properties required to created the revision manager. - * @return the revision manager An instance of revision manager. - * @throws IOException Signals that an I/O exception has occurred. - */ - public static RevisionManager getRevisionManager(Properties properties) throws IOException{ + public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class"; + + /** + * Gets an instance of revision manager. + * + * @param properties The properties required to created the revision manager. + * @return the revision manager An instance of revision manager. + * @throws IOException Signals that an I/O exception has occurred. + */ + private static RevisionManager getRevisionManager(String className, Properties properties) throws IOException{ RevisionManager revisionMgr; ClassLoader classLoader = Thread.currentThread() @@ -37,14 +47,9 @@ public class RevisionManagerFactory { if (classLoader == null) { classLoader = RevisionManagerFactory.class.getClassLoader(); } - String className = properties.getProperty( - RevisionManager.REVISION_MGR_IMPL_CLASS, - ZKBasedRevisionManager.class.getName()); try { - - @SuppressWarnings("unchecked") - Class revisionMgrClass = (Class) Class - .forName(className, true , classLoader); + Class revisionMgrClass = Class + .forName(className, true , classLoader).asSubclass(RevisionManager.class); revisionMgr = (RevisionManager) revisionMgrClass.newInstance(); revisionMgr.initialize(properties); } catch (ClassNotFoundException e) { @@ -67,4 +72,58 @@ public class RevisionManagerFactory { return revisionMgr; } + /** + * Internally used by endpoint implementation to instantiate from different configuration setting. + * @param className + * @param conf + * @return + * @throws IOException + */ + static RevisionManager getOpenedRevisionManager(String className, Configuration conf) throws IOException { + + Properties properties = new Properties(); + String zkHostList = conf.get(HConstants.ZOOKEEPER_QUORUM); + int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, + HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + + if (zkHostList != null) { + String[] splits = zkHostList.split(","); + StringBuffer sb = new StringBuffer(); + for (String split : splits) { + sb.append(split); + sb.append(':'); + sb.append(port); + sb.append(','); + } + + sb.deleteCharAt(sb.length() - 1); + properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString()); + } + String dataDir = conf.get(ZKBasedRevisionManager.DATADIR); + if (dataDir != null) { + properties.put(ZKBasedRevisionManager.DATADIR, dataDir); + } + RevisionManager revisionMgr = RevisionManagerFactory + .getRevisionManager(className, properties); + if (revisionMgr instanceof Configurable) { + ((Configurable)revisionMgr).setConf(conf); + } + revisionMgr.open(); + return revisionMgr; + } + + /** + * Gets an instance of revision manager which is opened. + * The revision manager implementation can be specified as {@link #REVISION_MGR_IMPL_CLASS}, + * default is {@link ZKBasedRevisionManager}. + * @param hbaseConf The HBase configuration. + * @return RevisionManager An instance of revision manager. + * @throws IOException + */ + public static RevisionManager getOpenedRevisionManager(Configuration hbaseConf) throws IOException { + String className = hbaseConf.get(RevisionManagerFactory.REVISION_MGR_IMPL_CLASS, + ZKBasedRevisionManager.class.getName()); + return getOpenedRevisionManager(className, hbaseConf); + } + } Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java?rev=1304536&view=auto ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java (added) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java Fri Mar 23 18:14:36 2012 @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; + +/** + * Interface marker to implement RevisionManager as Coprocessor. + * (needs to extend CoprocessorProtocol) + */ +public interface RevisionManagerProtocol extends RevisionManager, + CoprocessorProtocol { + +} Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java?rev=1304536&r1=1304535&r2=1304536&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java Fri Mar 23 18:14:36 2012 @@ -17,6 +17,7 @@ */ package org.apache.hcatalog.hbase.snapshot; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -24,7 +25,7 @@ import java.util.Map; /** * The snapshot for a table and a list of column families. */ -public class TableSnapshot { +public class TableSnapshot implements Serializable { private String name; @@ -35,6 +36,9 @@ public class TableSnapshot { public TableSnapshot(String name, Map cfRevMap, long latestRevision) { this.name = name; + if (cfRevMap == null) { + throw new IllegalArgumentException("revision map cannot be null"); + } this.cfRevisionMap = cfRevMap; this.latestRevision = latestRevision; } Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java?rev=1304536&view=auto ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java (added) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java Fri Mar 23 18:14:36 2012 @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides a revision manager for data stored in HBase that can be used to implement repeatable reads. + * The component is designed to be usable for revision management of data stored in HBase in general, + * independent and not limited to HCatalog. It is used by the HCatalog HBase storage handler, implementation depends on HBase 0.92+. + *

+ * For more information please see + * Snapshots and Repeatable reads for HBase Tables. + * @since 0.4 + */ +package org.apache.hcatalog.hbase.snapshot; Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java?rev=1304536&r1=1304535&r2=1304536&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java Fri Mar 23 18:14:36 2012 @@ -254,8 +254,8 @@ public class ManyMiniCluster { hbaseConf.set("hbase.rootdir", hbaseRoot); hbaseConf.set("hbase.master", "local"); - hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort); - hbaseConf.set("hbase.zookeeper.quorum", "127.0.0.1"); + hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zookeeperPort); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); hbaseConf.setInt("hbase.master.port", findFreePort()); hbaseConf.setInt("hbase.master.info.port", -1); hbaseConf.setInt("hbase.regionserver.port", findFreePort()); @@ -306,7 +306,7 @@ public class ManyMiniCluster { private File workDir; private int numTaskTrackers = 1; private JobConf jobConf; - private HBaseConfiguration hbaseConf; + private Configuration hbaseConf; private HiveConf hiveConf; private boolean miniMRClusterEnabled = true; @@ -329,7 +329,7 @@ public class ManyMiniCluster { return this; } - public Builder hbaseConf(HBaseConfiguration hbaseConf) { + public Builder hbaseConf(Configuration hbaseConf) { this.hbaseConf = hbaseConf; return this; } Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java?rev=1304536&r1=1304535&r2=1304536&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java (original) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java Fri Mar 23 18:14:36 2012 @@ -18,9 +18,18 @@ package org.apache.hcatalog.hbase; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -28,14 +37,6 @@ import org.apache.hadoop.hive.conf.HiveC import org.junit.AfterClass; import org.junit.BeforeClass; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Random; -import java.util.Set; - /** * Base class for HBase Tests which need a mini cluster instance */ @@ -48,6 +49,12 @@ public abstract class SkeletonHBaseTest protected static Map contextMap = new HashMap(); protected static Set tableNames = new HashSet(); + /** + * Allow tests to alter the default MiniCluster configuration. + * (requires static initializer block as all setup here is static) + */ + protected static Configuration testConf = null; + protected void createTable(String tableName, String[] families) { try { HBaseAdmin admin = new HBaseAdmin(getHbaseConf()); @@ -76,6 +83,7 @@ public abstract class SkeletonHBaseTest return name; } + /** * startup an hbase cluster instance before a test suite runs */ @@ -173,9 +181,13 @@ public abstract class SkeletonHBaseTest public void start() { if(usageCount++ == 0) { - cluster = ManyMiniCluster.create(new File(testDir)).build(); + ManyMiniCluster.Builder b = ManyMiniCluster.create(new File(testDir)); + if (testConf != null) { + b.hbaseConf(HBaseConfiguration.create(testConf)); + } + cluster = b.build(); cluster.start(); - hbaseConf = cluster.getHBaseConf(); + this.hbaseConf = cluster.getHBaseConf(); jobConf = cluster.getJobConf(); fileSystem = cluster.getFileSystem(); hiveConf = cluster.getHiveConf(); Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java?rev=1304536&view=auto ============================================================================== --- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java (added) +++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java Fri Mar 23 18:14:36 2012 @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.hbase.snapshot; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hcatalog.hbase.SkeletonHBaseTest; +import org.junit.Assert; +import org.junit.Test; + +public class TestRevisionManagerEndpoint extends SkeletonHBaseTest { + + static { + // test case specific mini cluster settings + testConf = new Configuration(false); + testConf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpoint", + "org.apache.hadoop.hbase.coprocessor.GenericEndpoint"); + testConf.set(RevisionManagerEndpoint.REVISION_MGR_ENDPOINT_IMPL_CLASS, MockRM.class.getName()); + } + + /** + * Mock implementation to test the protocol/serialization + */ + public static class MockRM implements RevisionManager { + + private static class Invocation { + Invocation(String methodName, Object ret, Object... args) { + this.methodName = methodName; + this.args = args; + this.ret = ret; + } + + String methodName; + Object[] args; + Object ret; + + private static boolean equals(Object obj1, Object obj2) { + if (obj1 == obj2) return true; + if (obj1 == null || obj2 == null) return false; + if (obj1 instanceof Transaction || obj1 instanceof TableSnapshot) { + return obj1.toString().equals(obj2.toString()); + } + return obj1.equals(obj2); + } + + @Override + public boolean equals(Object obj) { + Invocation other = (Invocation)obj; + if (this == other) return true; + if (other == null) return false; + if (this.args != other.args) { + if (this.args == null || other.args == null) return false; + if (this.args.length != other.args.length) return false; + for (int i=0; i INSTANCES = new ConcurrentHashMap(); + Invocation lastCall; + boolean isOpen = false; + + private T recordCall(T result, Object...args) { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + lastCall = new Invocation(stackTrace[2].getMethodName(), result, args); + return result; + } + + @Override + public void initialize(Properties properties) { + INSTANCES.put("default", this); + } + + @Override + public void open() throws IOException { + isOpen = true; + } + + @Override + public void close() throws IOException { + isOpen = false; + } + + @Override + public Transaction beginWriteTransaction(String table, + List families) throws IOException { + return recordCall(null, table, families); + } + + @Override + public Transaction beginWriteTransaction(String table, + List families, long keepAlive) throws IOException { + return recordCall(null, table, families, keepAlive); + } + + @Override + public void commitWriteTransaction(Transaction transaction) + throws IOException { + } + + @Override + public void abortWriteTransaction(Transaction transaction) + throws IOException { + } + + @Override + public List getAbortedWriteTransactions(String table, + String columnFamily) throws IOException { + return null; + } + + @Override + public TableSnapshot createSnapshot(String tableName) + throws IOException { + return null; + } + + @Override + public TableSnapshot createSnapshot(String tableName, long revision) + throws IOException { + TableSnapshot ret = new TableSnapshot(tableName, new HashMap(), revision); + return recordCall(ret, tableName, revision); + } + + @Override + public void keepAlive(Transaction transaction) throws IOException { + recordCall(null, transaction); + } + } + + @Test + public void testRevisionManagerProtocol() throws Throwable { + + Configuration conf = getHbaseConf(); + RevisionManager rm = RevisionManagerFactory.getOpenedRevisionManager( + RevisionManagerEndpointClient.class.getName(), conf); + + MockRM mockImpl = MockRM.INSTANCES.get(MockRM.DEFAULT_INSTANCE); + Assert.assertNotNull(mockImpl); + Assert.assertTrue(mockImpl.isOpen); + + Transaction t = new Transaction("t1", Arrays.asList("f1", "f2"), 0, 0); + MockRM.Invocation call = new MockRM.Invocation("keepAlive", null, t); + rm.keepAlive(t); + Assert.assertEquals(call.methodName, call, mockImpl.lastCall); + + t = new Transaction("t2", Arrays.asList("f21", "f22"), 0, 0); + call = new MockRM.Invocation("beginWriteTransaction", null, t.getTableName(), t.getColumnFamilies()); + call.ret = rm.beginWriteTransaction(t.getTableName(), t.getColumnFamilies()); + Assert.assertEquals(call.methodName, call, mockImpl.lastCall); + + call = new MockRM.Invocation("createSnapshot", null, "t3", 1L); + call.ret = rm.createSnapshot("t3", 1); + Assert.assertEquals(call.methodName, call, mockImpl.lastCall); + + } + +}