Return-Path: X-Original-To: apmail-incubator-gora-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-gora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C5C9778F4 for ; Thu, 17 Nov 2011 13:37:59 +0000 (UTC) Received: (qmail 11553 invoked by uid 500); 17 Nov 2011 13:37:59 -0000 Delivered-To: apmail-incubator-gora-commits-archive@incubator.apache.org Received: (qmail 11523 invoked by uid 500); 17 Nov 2011 13:37:59 -0000 Mailing-List: contact gora-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: gora-dev@incubator.apache.org Delivered-To: mailing list gora-commits@incubator.apache.org Received: (qmail 11516 invoked by uid 99); 17 Nov 2011 13:37:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Nov 2011 13:37:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Nov 2011 13:37:57 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 84B5523889D7; Thu, 17 Nov 2011 13:37:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1203184 - in /incubator/gora/trunk: ./ gora-hbase/src/main/java/org/apache/gora/hbase/store/ Date: Thu, 17 Nov 2011 13:37:36 -0000 To: gora-commits@incubator.apache.org From: lewismc@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111117133736.84B5523889D7@eris.apache.org> Author: lewismc Date: Thu Nov 17 13:37:35 2011 New Revision: 1203184 URL: http://svn.apache.org/viewvc?rev=1203184&view=rev Log: commit to address GORA-56, update to changes.txt and thank you to Ferdy Added: incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java Modified: incubator/gora/trunk/CHANGES.txt incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Modified: incubator/gora/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/gora/trunk/CHANGES.txt?rev=1203184&r1=1203183&r2=1203184&view=diff ============================================================================== --- incubator/gora/trunk/CHANGES.txt (original) +++ incubator/gora/trunk/CHANGES.txt Thu Nov 17 13:37:35 2011 @@ -2,6 +2,8 @@ Gora Change Log Trunk (unreleased changes): +* GORA-56 HBaseStore is not thread safe. (Ferdy via lewismc) + * GORA-48. HBaseStore initialization of table without configuration in constructor will throw Exception (Ferdy via lewismc) * GORA-47&46. fix tar ant target & Add nightly target to build.xml respectively (lewismc) Modified: incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java?rev=1203184&r1=1203183&r2=1203184&view=diff ============================================================================== --- incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java (original) +++ incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java Thu Nov 17 13:37:35 2011 @@ -24,24 +24,31 @@ import java.util.Arrays; */ class HBaseColumn { - String tableName; - byte[] family; - byte[] qualifier; + final String tableName; + final byte[] family; + final byte[] qualifier; public HBaseColumn(String tableName, byte[] family, byte[] qualifier) { this.tableName = tableName; - this.family = family; - this.qualifier = qualifier; + this.family = family==null ? null : Arrays.copyOf(family, family.length); + this.qualifier = qualifier==null ? null : + Arrays.copyOf(qualifier, qualifier.length); } public String getTableName() { return tableName; } + /** + * @return the family (internal array returned; do not modify) + */ public byte[] getFamily() { return family; } + /** + * @return the qualifer (internal array returned; do not modify) + */ public byte[] getQualifier() { return qualifier; } Modified: incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java?rev=1203184&r1=1203183&r2=1203184&view=diff ============================================================================== --- incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java (original) +++ incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseMapping.java Thu Nov 17 13:37:35 2011 @@ -28,37 +28,30 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.util.Bytes; /** - * Mapping definitions for HBase + * Mapping definitions for HBase. Thread safe. */ public class HBaseMapping { - private Map tableDescriptors - = new HashMap(); + private final Map tableDescriptors; //name of the primary table - private String tableName; + private final String tableName; // a map from field name to hbase column - private Map columnMap = - new HashMap(); + private final Map columnMap; - public HBaseMapping() { - } - - public void setTableName(String tableName) { + public HBaseMapping(Map tableDescriptors, + String tableName, Map columnMap) { + super(); + this.tableDescriptors = tableDescriptors; this.tableName = tableName; + this.columnMap = columnMap; } - + public String getTableName() { return tableName; } - public void addTable(String tableName) { - if(!tableDescriptors.containsKey(tableName)) { - tableDescriptors.put(tableName, new HTableDescriptor(tableName)); - } - } - public HTableDescriptor getTable() { return getTable(tableName); } @@ -67,49 +60,94 @@ public class HBaseMapping { return tableDescriptors.get(tableName); } - public void addColumnFamily(String tableName, String familyName - , String compression, String blockCache, String blockSize, String bloomFilter - , String maxVersions, String timeToLive, String inMemory) { - - HColumnDescriptor columnDescriptor = addColumnFamily(tableName, familyName); - - if(compression != null) - columnDescriptor.setCompressionType(Algorithm.valueOf(compression)); - if(blockCache != null) - columnDescriptor.setBlockCacheEnabled(Boolean.parseBoolean(blockCache)); - if(blockSize != null) - columnDescriptor.setBlocksize(Integer.parseInt(blockSize)); - if(bloomFilter != null) - columnDescriptor.setBloomFilterType(BloomType.valueOf(bloomFilter)); - if(maxVersions != null) - columnDescriptor.setMaxVersions(Integer.parseInt(maxVersions)); - if(timeToLive != null) - columnDescriptor.setTimeToLive(Integer.parseInt(timeToLive)); - if(inMemory != null) - columnDescriptor.setInMemory(Boolean.parseBoolean(inMemory)); - - getTable(tableName).addFamily(columnDescriptor); - } - - public HColumnDescriptor addColumnFamily(String tableName, String familyName) { - HTableDescriptor tableDescriptor = getTable(tableName); - HColumnDescriptor columnDescriptor = tableDescriptor.getFamily(Bytes.toBytes(familyName)); - if(columnDescriptor == null) { - columnDescriptor = new HColumnDescriptor(familyName); - tableDescriptor.addFamily(columnDescriptor); - } - return columnDescriptor; - } - - public void addField(String fieldName, String tableName, String family, String qualifier) { - byte[] familyBytes = Bytes.toBytes(family); - byte[] qualifierBytes = qualifier == null ? null : Bytes.toBytes(qualifier); - - HBaseColumn column = new HBaseColumn(tableName, familyBytes, qualifierBytes); - columnMap.put(fieldName, column); - } - public HBaseColumn getColumn(String fieldName) { return columnMap.get(fieldName); } + + /** + * A builder for creating the mapper. This will allow building a thread safe + * {@link HBaseMapping} using simple immutabilty. + * + */ + public static class HBaseMappingBuilder { + private Map tableDescriptors + = new HashMap(); + private String tableName; + private Map columnMap = + new HashMap(); + + public void addTable(String tableName) { + if(!tableDescriptors.containsKey(tableName)) { + tableDescriptors.put(tableName, new HTableDescriptor(tableName)); + } + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void addColumnFamily(String tableName, String familyName, + String compression, String blockCache, String blockSize, + String bloomFilter ,String maxVersions, String timeToLive, + String inMemory) { + + HColumnDescriptor columnDescriptor = addColumnFamily(tableName, + familyName); + + if(compression != null) + columnDescriptor.setCompressionType(Algorithm.valueOf(compression)); + if(blockCache != null) + columnDescriptor.setBlockCacheEnabled(Boolean.parseBoolean(blockCache)); + if(blockSize != null) + columnDescriptor.setBlocksize(Integer.parseInt(blockSize)); + if(bloomFilter != null) + columnDescriptor.setBloomFilterType(BloomType.valueOf(bloomFilter)); + if(maxVersions != null) + columnDescriptor.setMaxVersions(Integer.parseInt(maxVersions)); + if(timeToLive != null) + columnDescriptor.setTimeToLive(Integer.parseInt(timeToLive)); + if(inMemory != null) + columnDescriptor.setInMemory(Boolean.parseBoolean(inMemory)); + + getTable(tableName).addFamily(columnDescriptor); + } + + public HTableDescriptor getTable(String tableName) { + return tableDescriptors.get(tableName); + } + + public HColumnDescriptor addColumnFamily(String tableName, + String familyName) { + HTableDescriptor tableDescriptor = getTable(tableName); + HColumnDescriptor columnDescriptor = tableDescriptor.getFamily( + Bytes.toBytes(familyName)); + if(columnDescriptor == null) { + columnDescriptor = new HColumnDescriptor(familyName); + tableDescriptor.addFamily(columnDescriptor); + } + return columnDescriptor; + } + + public void addField(String fieldName, String tableName, String family, + String qualifier) { + byte[] familyBytes = Bytes.toBytes(family); + byte[] qualifierBytes = qualifier == null ? null : + Bytes.toBytes(qualifier); + + HBaseColumn column = new HBaseColumn(tableName, familyBytes, + qualifierBytes); + columnMap.put(fieldName, column); + } + + /** + * @return A newly constructed mapping. + */ + public HBaseMapping build() { + return new HBaseMapping(tableDescriptors, tableName, columnMap); + } + } } \ No newline at end of file Modified: incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1203184&r1=1203183&r2=1203184&view=diff ============================================================================== --- incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original) +++ incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Thu Nov 17 13:37:35 2011 @@ -43,6 +43,7 @@ import org.apache.commons.logging.LogFac import org.apache.gora.hbase.query.HBaseGetResult; import org.apache.gora.hbase.query.HBaseQuery; import org.apache.gora.hbase.query.HBaseScannerResult; +import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder; import org.apache.gora.hbase.util.HBaseByteInterface; import org.apache.gora.persistency.ListGenericArray; import org.apache.gora.persistency.Persistent; @@ -62,7 +63,6 @@ import org.apache.hadoop.hbase.HTableDes import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -74,8 +74,7 @@ import org.jdom.Element; import org.jdom.input.SAXBuilder; /** - * DataStore for HBase. - *

Note: HBaseStore is not yet thread-safe. + * DataStore for HBase. Thread safe. * */ public class HBaseStore extends DataStoreBase @@ -89,15 +88,15 @@ implements Configurable { private static final String DEPRECATED_MAPPING_FILE = "hbase-mapping.xml"; public static final String DEFAULT_MAPPING_FILE = "gora-hbase-mapping.xml"; - private HBaseAdmin admin; + private volatile HBaseAdmin admin; - private HTable table; + private volatile HBaseTableConnection table; - private Configuration conf; + private volatile Configuration conf; - private boolean autoCreateSchema = true; + private final boolean autoCreateSchema = true; - private HBaseMapping mapping; + private volatile HBaseMapping mapping; public HBaseStore() { } @@ -131,7 +130,7 @@ implements Configurable { createSchema(); } - table = new HTable(conf, mapping.getTableName()); + table = new HBaseTableConnection(getConf(), mapping.getTableName(), true); } @Override @@ -152,9 +151,6 @@ implements Configurable { @Override public void deleteSchema() throws IOException { if(!admin.tableExists(mapping.getTableName())) { - if(table != null) { - table.getWriteBuffer().clear(); - } return; } admin.disableTable(mapping.getTableName()); @@ -521,7 +517,7 @@ implements Configurable { @SuppressWarnings("unchecked") private HBaseMapping readMapping(String filename) throws IOException { - HBaseMapping mapping = new HBaseMapping(); + HBaseMappingBuilder mappingBuilder = new HBaseMappingBuilder(); try { SAXBuilder builder = new SAXBuilder(); @@ -532,7 +528,7 @@ implements Configurable { List tableElements = root.getChildren("table"); for(Element tableElement : tableElements) { String tableName = tableElement.getAttributeValue("name"); - mapping.addTable(tableName); + mappingBuilder.addTable(tableName); List fieldElements = tableElement.getChildren("field"); for(Element fieldElement : fieldElements) { @@ -545,28 +541,33 @@ implements Configurable { String timeToLive = fieldElement.getAttributeValue("timeToLive"); String inMemory = fieldElement.getAttributeValue("inMemory"); - mapping.addColumnFamily(tableName, familyName, compression, blockCache, blockSize - , bloomFilter, maxVersions, timeToLive, inMemory); + mappingBuilder.addColumnFamily(tableName, familyName, compression, + blockCache, blockSize, bloomFilter, maxVersions, timeToLive, + inMemory); } } List classElements = root.getChildren("class"); for(Element classElement: classElements) { - if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName()) + if(classElement.getAttributeValue("keyClass").equals( + keyClass.getCanonicalName()) && classElement.getAttributeValue("name").equals( persistentClass.getCanonicalName())) { - String tableName = getSchemaName(classElement.getAttributeValue("table"), persistentClass); - mapping.addTable(tableName); - mapping.setTableName(tableName); + String tableName = getSchemaName( + classElement.getAttributeValue("table"), persistentClass); + mappingBuilder.addTable(tableName); + mappingBuilder.setTableName(tableName); List fields = classElement.getChildren("field"); for(Element field:fields) { String fieldName = field.getAttributeValue("name"); String family = field.getAttributeValue("family"); String qualifier = field.getAttributeValue("qualifier"); - mapping.addField(fieldName, mapping.getTableName(), family, qualifier); - mapping.addColumnFamily(mapping.getTableName(), family);//implicit family definition + mappingBuilder.addField(fieldName, mappingBuilder.getTableName(), + family, qualifier); + mappingBuilder.addColumnFamily(mappingBuilder.getTableName(), + family);//implicit family definition } break; @@ -578,14 +579,12 @@ implements Configurable { throw new IOException(ex); } - return mapping; + return mappingBuilder.build(); } @Override public void close() throws IOException { - flush(); - if(table != null) - table.close(); + table.close(); } @Override Added: incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java?rev=1203184&view=auto ============================================================================== --- incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java (added) +++ incubator/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java Thu Nov 17 13:37:35 2011 @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.hbase.store; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowLock; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * Thread safe implementation to connect to a HBase table. + * + */ +public class HBaseTableConnection implements HTableInterface{ + /* + * The current implementation uses ThreadLocal HTable instances. It keeps + * track of the floating instances in order to correctly flush and close + * the connection when it is closed. HBase itself provides a utility called + * HTablePool for maintaining a pool of tables, but there are still some + * drawbacks that are only solved in later releases. + * + */ + + private final Configuration conf; + private final ThreadLocal tables; + private final BlockingQueue pool = new LinkedBlockingQueue(); + private final boolean autoflush; + private final String tableName; + + /** + * Instantiate new connection. + * + * @param conf + * @param tableName + * @param autoflush + * @throws IOException + */ + public HBaseTableConnection(Configuration conf, String tableName, + boolean autoflush) throws IOException { + this.conf = conf; + this.tables = new ThreadLocal(); + this.tableName = tableName; + this.autoflush = autoflush; + } + + private HTable getTable() throws IOException { + HTable table = tables.get(); + if (table == null) { + table = new HTable(conf, tableName); + table.setAutoFlush(autoflush); + pool.add(table); //keep track + tables.set(table); + } + return table; + } + + @Override + public void close() throws IOException { + // Flush and close all instances. + // (As an extra safeguard one might employ a shared variable i.e. 'closed' + // in order to prevent further table creation but for now we assume that + // once close() is called, clients are no longer using it). + for (HTable table : pool) { + table.flushCommits(); + table.close(); + } + } + + @Override + public byte[] getTableName() { + return Bytes.toBytes(tableName); + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public boolean isAutoFlush() { + return autoflush; + } + + /** + * getStartEndKeys provided by {@link HTable} but not {@link HTableInterface}. + * @see HTable#getStartEndKeys() + */ + public Pair getStartEndKeys() throws IOException { + return getTable().getStartEndKeys(); + } + /** + * getRegionLocation provided by {@link HTable} but not + * {@link HTableInterface}. + * @see HTable#getRegionLocation(byte[]) + */ + public HRegionLocation getRegionLocation(final byte[] bs) throws IOException { + return getTable().getRegionLocation(bs); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return getTable().getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + return getTable().exists(get); + } + + @Override + public void batch(List actions, Object[] results) throws IOException, + InterruptedException { + getTable().batch(actions, results); + } + + @Override + public Object[] batch(List actions) throws IOException, + InterruptedException { + return getTable().batch(actions); + } + + @Override + public Result get(Get get) throws IOException { + return getTable().get(get); + } + + @Override + public Result[] get(List gets) throws IOException { + return getTable().get(gets); + } + + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + return getTable().getRowOrBefore(row, family); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return getTable().getScanner(scan); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return getTable().getScanner(family); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + return getTable().getScanner(family, qualifier); + } + + @Override + public void put(Put put) throws IOException { + getTable().put(put); + } + + @Override + public void put(List puts) throws IOException { + getTable().put(puts); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + return getTable().checkAndPut(row, family, qualifier, value, put); + } + + @Override + public void delete(Delete delete) throws IOException { + getTable().delete(delete); + } + + @Override + public void delete(List deletes) throws IOException { + getTable().delete(deletes); + + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + return getTable().checkAndDelete(row, family, qualifier, value, delete); + } + + @Override + public Result increment(Increment increment) throws IOException { + return getTable().increment(increment); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount) throws IOException { + return getTable().incrementColumnValue(row, family, qualifier, amount); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount, boolean writeToWAL) throws IOException { + return getTable().incrementColumnValue(row, family, qualifier, amount, + writeToWAL); + } + + @Override + public void flushCommits() throws IOException { + getTable().flushCommits(); + } + + @Override + public RowLock lockRow(byte[] row) throws IOException { + return getTable().lockRow(row); + } + + @Override + public void unlockRow(RowLock rl) throws IOException { + getTable().unlockRow(rl); + } +}