Return-Path: Delivered-To: apmail-directory-commits-archive@www.apache.org Received: (qmail 60400 invoked from network); 21 Feb 2010 22:59:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 21 Feb 2010 22:59:43 -0000 Received: (qmail 399 invoked by uid 500); 21 Feb 2010 22:59:43 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 338 invoked by uid 500); 21 Feb 2010 22:59:43 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 329 invoked by uid 99); 21 Feb 2010 22:59:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Feb 2010 22:59:43 +0000 X-ASF-Spam-Status: No, hits=-1999.6 required=10.0 tests=ALL_TRUSTED,SUBJECT_FUZZY_TION X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Feb 2010 22:59:40 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 75A1623889F1; Sun, 21 Feb 2010 22:59:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r912434 [1/2] - in /directory/sandbox/seelmann/hbase-partition/src: main/java/org/apache/directory/server/core/partition/hbase/ main/java/org/apache/directory/server/core/partition/hbase/cursor/ main/java/org/apache/directory/server/core/pa... Date: Sun, 21 Feb 2010 22:59:20 -0000 To: commits@directory.apache.org From: seelmann@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100221225920.75A1623889F1@eris.apache.org> Author: seelmann Date: Sun Feb 21 22:59:19 2010 New Revision: 912434 URL: http://svn.apache.org/viewvc?rev=912434&view=rev Log: o added Map/Reduce job for LDIF import and indexing o added pool for HTable objects o removed counters for indices (they don't scale) Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTablePool.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/ directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/GetPerformanceEvaluation.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/LdifImportAndIndexIT.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/RemoteLdifImport.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/RemoteRunner.java directory/sandbox/seelmann/hbase-partition/src/test/resources/testdata-5.ldif Removed: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexMapper.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/RowCounterMapper.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/SimpleMapper.java Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTableHelper.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/HBaseClusterTestCaseAdapter.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/HBaseDistributedRunner.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/HBaseEmbeddedRunner.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/table/AbstractHBaseTableTest.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableTest.java directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTableTest.java directory/sandbox/seelmann/hbase-partition/src/test/resources/log4j.properties Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java Sun Feb 21 22:59:19 2010 @@ -22,6 +22,7 @@ import java.io.File; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,7 +40,7 @@ import org.apache.directory.server.core.partition.hbase.index.HBaseSubAliasIndex; import org.apache.directory.server.core.partition.hbase.index.HBaseSubLevelIndex; import org.apache.directory.server.core.partition.hbase.index.HBaseUserIndex; -import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTableBase; +import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable; import org.apache.directory.server.core.partition.hbase.table.HBaseMasterTable; import org.apache.directory.server.xdbm.Index; import org.apache.directory.server.xdbm.IndexCursor; @@ -67,6 +68,8 @@ public class HBaseStore implements Store { + public static final String STRONG_CONSISTENCY_PROPERTY = "org.apache.directory.strong.consistency"; + private String tablePrefix; private LdapDN suffixDn; @@ -80,8 +83,8 @@ private HBaseMasterTable masterTable; - private Map> userIndices = new HashMap>(); - private Index presenceIndex; + private Map> userIndices = new HashMap>(); + private HBasePresenceIndex presenceIndex; private Index ndnIndex; private Index oneLevelIndex; private Index subLevelIndex; @@ -184,7 +187,7 @@ String oid = getAttributeTypeOid( attr ); if ( userIndices.containsKey( oid ) ) { - HBaseUserIndex index = userIndices.get( oid ); + HBaseUserIndex index = userIndices.get( oid ); for ( Value value : attribute ) { index.add( value.getBytes(), id ); @@ -224,7 +227,7 @@ @SuppressWarnings("unchecked") public void addIndex( Index index ) { - this.userIndices.put( index.getAttributeId(), ( HBaseUserIndex ) index ); + this.userIndices.put( index.getAttributeId(), ( HBaseUserIndex ) index ); } @@ -246,7 +249,7 @@ String oid = getAttributeTypeOid( attr ); if ( userIndices.containsKey( oid ) ) { - HBaseUserIndex index = userIndices.get( oid ); + HBaseUserIndex index = userIndices.get( oid ); for ( Value value : attribute ) { index.drop( value.getBytes(), id ); @@ -350,7 +353,7 @@ } - public Index getPresenceIndex() + public HBasePresenceIndex getPresenceIndex() { return presenceIndex; } @@ -404,7 +407,7 @@ } - public Index getUserIndex( String id ) throws IndexNotFoundException + public HBaseUserIndex getUserIndex( String id ) throws IndexNotFoundException { id = getAttributeTypeOid( id ); @@ -432,7 +435,7 @@ public Set> getUserIndices() { - throw new UnsupportedOperationException(); + return new HashSet>( userIndices.values() ); } @@ -485,7 +488,7 @@ String oid = getAttributeTypeOid( attr ); if ( userIndices.containsKey( oid ) ) { - HBaseUserIndex index = userIndices.get( oid ); + HBaseUserIndex index = userIndices.get( oid ); for ( Value value : attribute ) { index.drop( value.getBytes(), id ); @@ -554,7 +557,7 @@ String oid = getAttributeTypeOid( attr ); if ( userIndices.containsKey( oid ) ) { - HBaseUserIndex index = userIndices.get( oid ); + HBaseUserIndex index = userIndices.get( oid ); for ( Value value : attribute ) { index.add( value.getBytes(), id ); @@ -645,7 +648,7 @@ public void setPresenceIndex( Index presenceIndex ) throws Exception { - this.presenceIndex = presenceIndex; + this.presenceIndex = ( HBasePresenceIndex ) presenceIndex; } Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java Sun Feb 21 22:59:19 2010 @@ -168,30 +168,30 @@ { /* * this case is relevant for substring filters with an initial pattern, e.g. (cn=test*) - * - row filter is "^#test.*$" + * - row filter is "^=test.*$" */ start = indexTable.getScanKey( node.getInitial(), null ); stop = indexTable.getScanKey( node.getInitial(), HBaseIndexTable.VALUE_SCAN_STOP ); - rowFilterPattern = Pattern.compile( "^#" + Utils.getValuePattern( node, store ) + "$" ); + rowFilterPattern = Pattern.compile( "^=" + Utils.getValuePattern( node, store ) + "$" ); } else { /* * this case is relevant for substring filters w/o an initial pattern, e.g. (cn=*test*) * unfortunately we need to scan the whole index, but can set a row filter - * - row filter is "^#.*test.*$" + * - row filter is "^=.*test.*$" */ start = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_START, null ); stop = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_STOP, null ); - rowFilterPattern = Pattern.compile( "^#" + Utils.getValuePattern( node, store ) + "$" ); + rowFilterPattern = Pattern.compile( "^=" + Utils.getValuePattern( node, store ) + "$" ); } } else if ( value != null ) { /* * this case is relevant for greater than filters (the start value is set by before(IndexEntry)), e.g. (cn>=test) - * - start row is "#test" - * - stop row is "#0xFF" + * - start row is "=test" + * - stop row is "=0xFF" */ start = indexTable.getScanKey( value, null ); stop = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_STOP, null ); @@ -203,8 +203,8 @@ /* * this case is relevant for less than filters, e.g. (cn<=test) * unfortunately we need to scan the whole index - * - start row is "#0x00" - * - stop row is "#0xFF" + * - start row is "=0x00" + * - stop row is "=0xFF" */ start = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_START, null ); stop = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_STOP, null ); @@ -234,7 +234,7 @@ if ( iterator.hasNext() ) { Result result = iterator.next(); - value = indexTable.getValueFromCountKey( result.getRow() ); + value = indexTable.extractValueFromEqualsKey( result.getRow() ); candidates = indexTable.getColumnCandidates( result ).iterator(); } else Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java Sun Feb 21 22:59:19 2010 @@ -27,7 +27,6 @@ import org.apache.directory.server.core.partition.hbase.HBaseStore; import org.apache.directory.server.core.partition.hbase.cursor.HBasePresenceIndexCursor; import org.apache.directory.server.core.partition.hbase.table.HBasePresenceIndexTable; -import org.apache.directory.server.xdbm.IndexCursor; /** @@ -80,7 +79,7 @@ @Override - public IndexCursor forwardCursor( String key ) throws Exception + public HBasePresenceIndexCursor forwardCursor( String key ) throws Exception { return new HBasePresenceIndexCursor( getPresenceIndexTable( key ) ); } @@ -108,8 +107,8 @@ } else { - HBasePresenceIndexTable presenceIndexTable = new HBasePresenceIndexTable( attributeTypeOid, store - .getSchemaManager(), store.getTablePrefix(), store.getConfiguration(), getCacheSize() ); + HBasePresenceIndexTable presenceIndexTable = new HBasePresenceIndexTable( attributeTypeOid, store, + getCacheSize() ); presenceIndexTables.put( attributeTypeOid, presenceIndexTable ); return presenceIndexTable; } Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java Sun Feb 21 22:59:19 2010 @@ -20,10 +20,8 @@ package org.apache.directory.server.core.partition.hbase.index; -import org.apache.directory.server.core.entry.ServerEntry; import org.apache.directory.server.core.partition.hbase.cursor.HBaseUserColumnIndexCursor; import org.apache.directory.server.core.partition.hbase.table.HBaseColumnIndexTable; -import org.apache.directory.server.xdbm.IndexCursor; import org.apache.directory.shared.ldap.filter.SubstringNode; @@ -58,27 +56,27 @@ @Override - public IndexCursor forwardCursor( Object value ) throws Exception + public HBaseUserColumnIndexCursor forwardCursor( Object value ) throws Exception { return new HBaseUserColumnIndexCursor( getAttributeId(), value, getIndexTable(), store ); } @Override - public IndexCursor forwardCursor() throws Exception + public HBaseUserColumnIndexCursor forwardCursor() throws Exception { return new HBaseUserColumnIndexCursor( getAttributeId(), getIndexTable(), store ); } - public IndexCursor forwardSubstringCursor( SubstringNode node ) throws Exception + public HBaseUserColumnIndexCursor forwardSubstringCursor( SubstringNode node ) throws Exception { return new HBaseUserColumnIndexCursor( getAttributeId(), node, getIndexTable(), store ); } @Override - protected HBaseColumnIndexTable getIndexTable() throws Exception + public HBaseColumnIndexTable getIndexTable() throws Exception { if ( indexTable == null ) { Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java Sun Feb 21 22:59:19 2010 @@ -24,7 +24,7 @@ import org.apache.directory.server.core.entry.ServerEntry; import org.apache.directory.server.core.partition.hbase.cursor.HBaseUserIndexReverseCursor; -import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTableBase; +import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable; import org.apache.directory.server.core.partition.hbase.xdbmext.IndexSubstringExtension; import org.apache.directory.server.xdbm.IndexCursor; import org.apache.directory.shared.ldap.filter.SubstringNode; @@ -37,7 +37,7 @@ * @author Apache Directory Project * @version $Rev$, $Date$ */ -public abstract class HBaseUserIndex extends AbstractHBaseIndex +public abstract class HBaseUserIndex extends AbstractHBaseIndex implements IndexSubstringExtension { @@ -217,6 +217,6 @@ } - protected abstract T getIndexTable() throws Exception; + public abstract T getIndexTable() throws Exception; } Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java Sun Feb 21 22:59:19 2010 @@ -20,10 +20,8 @@ package org.apache.directory.server.core.partition.hbase.index; -import org.apache.directory.server.core.entry.ServerEntry; import org.apache.directory.server.core.partition.hbase.cursor.HBaseUserRowIndexCursor; import org.apache.directory.server.core.partition.hbase.table.HBaseRowIndexTable; -import org.apache.directory.server.xdbm.IndexCursor; import org.apache.directory.shared.ldap.filter.SubstringNode; @@ -58,32 +56,31 @@ @Override - public IndexCursor forwardCursor( Object value ) throws Exception + public HBaseUserRowIndexCursor forwardCursor( Object value ) throws Exception { return new HBaseUserRowIndexCursor( getAttributeId(), value, getIndexTable(), store ); } @Override - public IndexCursor forwardCursor() throws Exception + public HBaseUserRowIndexCursor forwardCursor() throws Exception { return new HBaseUserRowIndexCursor( getAttributeId(), getIndexTable(), store ); } - public IndexCursor forwardSubstringCursor( SubstringNode node ) throws Exception + public HBaseUserRowIndexCursor forwardSubstringCursor( SubstringNode node ) throws Exception { return new HBaseUserRowIndexCursor( getAttributeId(), node, getIndexTable(), store ); } @Override - protected HBaseRowIndexTable getIndexTable() throws Exception + public HBaseRowIndexTable getIndexTable() throws Exception { if ( indexTable == null ) { - indexTable = new HBaseRowIndexTable( getAttributeId(), store.getSchemaManager(), store.getTablePrefix(), - store.getConfiguration(), getCacheSize() ); + indexTable = new HBaseRowIndexTable( getAttributeId(), store, getCacheSize() ); } return indexTable; } Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java?rev=912434&view=auto ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java (added) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java Sun Feb 21 22:59:19 2010 @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.server.core.partition.hbase.mapreduce; + + +import java.io.IOException; + +import org.apache.directory.server.core.entry.ServerEntry; +import org.apache.directory.server.core.partition.hbase.HBaseStore; +import org.apache.directory.server.core.partition.hbase.index.HBaseUserColumnIndex; +import org.apache.directory.server.core.partition.hbase.index.HBaseUserIndex; +import org.apache.directory.server.core.partition.hbase.index.HBaseUserRowIndex; +import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable; +import org.apache.directory.server.core.partition.hbase.table.HBaseMasterTable; +import org.apache.directory.shared.ldap.entry.EntryAttribute; +import org.apache.directory.shared.ldap.entry.Value; +import org.apache.directory.shared.ldap.name.LdapDN; +import org.apache.directory.shared.ldap.schema.SchemaManager; +import org.apache.directory.shared.ldap.schema.loader.ldif.JarLdifSchemaLoader; +import org.apache.directory.shared.ldap.schema.manager.impl.DefaultSchemaManager; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + + +/** + * Mapper that build indices. + * + * @author Apache Directory Project + * @version $Rev$, $Date$ + */ +public class IndexBuilder extends TableMapper +{ + + private static enum Counters + { + INDEXED_ENTRIES + } + + public static final String COLUMN_INDICES = "org.apache.directory.column.indices"; + public static final String ROW_INDICES = "org.apache.directory.row.indices"; + public static final String SUFFIX = "org.apache.directory.suffix"; + public static final String TABLE_PREFIX = "org.apache.directory.table.prefix"; + + private HBaseStore store; + + + protected void setup( Mapper.Context context ) + throws IOException, InterruptedException + { + try + { + JarLdifSchemaLoader schemaLoader = new JarLdifSchemaLoader(); + SchemaManager schemaManager = new DefaultSchemaManager( schemaLoader ); + schemaManager.loadAllEnabled(); + + LdapDN suffixDn = new LdapDN( context.getConfiguration().get( SUFFIX ) ); + suffixDn.normalize( schemaManager.getNormalizerMapping() ); + + store = new HBaseStore(); + + String columnIndices = context.getConfiguration().get( COLUMN_INDICES ); + String[] columnIndicesSplitted = columnIndices.split( "," ); + for ( String columnIndex : columnIndicesSplitted ) + { + HBaseUserColumnIndex index = new HBaseUserColumnIndex(); + String oid = schemaManager.getAttributeTypeRegistry().getOidByName( columnIndex ); + index.setAttributeId( oid ); + index.setStore( store ); + store.addIndex( index ); + + } + String rowIndices = context.getConfiguration().get( ROW_INDICES ); + String[] rowIndicesSplitted = rowIndices.split( "," ); + for ( String rowIndex : rowIndicesSplitted ) + { + HBaseUserRowIndex index = new HBaseUserRowIndex(); + String oid = schemaManager.getAttributeTypeRegistry().getOidByName( rowIndex ); + index.setAttributeId( oid ); + index.setStore( store ); + store.addIndex( index ); + } + + store.setSuffixDn( suffixDn.getName() ); + store.setCacheSize( 100 ); + String tablePrefix = context.getConfiguration().get( TABLE_PREFIX ); + store.setTablePrefix( tablePrefix ); + store.init( schemaManager ); + store.getConfiguration().setBoolean( HBaseStore.STRONG_CONSISTENCY_PROPERTY, false ); + store.getConfiguration().setBoolean( HBaseMasterTable.MAINTAIN_COUNTERS_PROPERTY, true ); + } + catch ( Throwable e ) + { + e.printStackTrace(); + throw new IOException( e ); + } + } + + + protected void cleanup( Mapper.Context context ) + throws IOException, InterruptedException + { + try + { + store.destroy(); + } + catch ( Throwable e ) + { + e.printStackTrace(); + throw new IOException( e ); + } + } + + + @Override + public void map( ImmutableBytesWritable key, Result result, Context context ) throws IOException, + InterruptedException + { + try + { + // write to tree table + Long id = Bytes.toLong( key.get() ); + ServerEntry entry = store.getMasterTable().convertToServerEntry( id, result ); + store.getMasterTable().addToTree( id, entry ); + + // write index tables + for ( EntryAttribute attribute : entry ) + { + String attr = attribute.getId(); + String oid = store.getSchemaManager().getAttributeTypeRegistry().getOidByName( attr ); + if ( store.hasUserIndexOn( oid ) ) + { + HBaseUserIndex index = store.getUserIndex( oid ); + for ( Value value : attribute ) + { + index.add( value.getBytes(), id ); + } + store.getPresenceIndex().add( oid, id ); + } + } + + context.getCounter( Counters.INDEXED_ENTRIES ).increment( 1 ); + } + catch ( Exception e ) + { + System.err.println( "Error indexing entry id=" + Bytes.toLong( key.get() ) ); + System.err.println( ">>>" + result + "<<<" ); + e.printStackTrace(); + } + } + +} Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java?rev=912434&view=auto ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java (added) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java Sun Feb 21 22:59:19 2010 @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.server.core.partition.hbase.mapreduce; + + +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import javax.naming.NamingException; + +import org.apache.directory.server.constants.ServerDNConstants; +import org.apache.directory.server.core.entry.DefaultServerEntry; +import org.apache.directory.server.core.entry.ServerEntry; +import org.apache.directory.server.core.partition.hbase.HBaseStore; +import org.apache.directory.server.core.partition.hbase.table.HBaseMasterTable; +import org.apache.directory.shared.ldap.constants.SchemaConstants; +import org.apache.directory.shared.ldap.csn.CsnFactory; +import org.apache.directory.shared.ldap.ldif.LdifEntry; +import org.apache.directory.shared.ldap.ldif.LdifReader; +import org.apache.directory.shared.ldap.name.LdapDN; +import org.apache.directory.shared.ldap.schema.SchemaManager; +import org.apache.directory.shared.ldap.schema.loader.ldif.JarLdifSchemaLoader; +import org.apache.directory.shared.ldap.schema.manager.impl.DefaultSchemaManager; +import org.apache.directory.shared.ldap.util.DateUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + + +/** + * Mapper that imports LDIF. + * + * @author Apache Directory Project + * @version $Rev$, $Date$ + */ +public class LdifImporter extends Mapper +{ + + private static enum Counters + { + IMPORTED_ENTRIES + } + + public static final String NAME_COMPONENT_COUNT = "org.apache.directory.name.component.count"; + public static final String SUFFIX = "org.apache.directory.suffix"; + public static final String TABLE_PREFIX = "org.apache.directory.table.prefix"; + + private static final CsnFactory CSN_FACTORY = new CsnFactory( 0 ); + private LdifReader ldifReader; + private HBaseStore store; + private int count; + + + protected void setup( Mapper.Context context ) throws IOException, + InterruptedException + { + try + { + String countAsString = context.getConfiguration().get( NAME_COMPONENT_COUNT, "0" ); + count = Integer.parseInt( countAsString ); + + ldifReader = new LdifReader(); + + JarLdifSchemaLoader schemaLoader = new JarLdifSchemaLoader(); + SchemaManager schemaManager = new DefaultSchemaManager( schemaLoader ); + schemaManager.loadAllEnabled(); + + LdapDN suffixDn = new LdapDN( context.getConfiguration().get( SUFFIX ) ); + suffixDn.normalize( schemaManager.getNormalizerMapping() ); + + store = new HBaseStore(); + store.setSuffixDn( suffixDn.getName() ); + store.setCacheSize( 100 ); + store.setTablePrefix( context.getConfiguration().get( TABLE_PREFIX ) ); + store.init( schemaManager ); + store.getConfiguration().setBoolean( HBaseStore.STRONG_CONSISTENCY_PROPERTY, false ); + store.getConfiguration().setBoolean( HBaseMasterTable.MAINTAIN_COUNTERS_PROPERTY, false ); + } + catch ( Throwable e ) + { + e.printStackTrace(); + throw new IOException( e ); + } + } + + + protected void cleanup( Mapper.Context context ) throws IOException, + InterruptedException + { + try + { + store.destroy(); + ldifReader.close(); + } + catch ( Throwable e ) + { + e.printStackTrace(); + throw new IOException( e ); + } + } + + + @Override + public void map( Object key, Text value, Context context ) throws IOException, InterruptedException + { + String record = value.toString(); + // prepend the version + //record = "version: 1\n\n" + record; + + try + { + List ldifEntries = ldifReader.parseLdif( record ); + for ( LdifEntry ldifEntry : ldifEntries ) + { + if ( ldifEntry.isEntry() ) + { + LdapDN dn = ldifEntry.getDn(); + int size = dn.size(); + if ( size == count ) + { + importLdifEntry( ldifEntry, context ); + } + } + } + } + catch ( NamingException e ) + { + System.err.println( "Error parsing LDIF: " ); + System.err.println( ">>>" + record + "<<<" ); + e.printStackTrace(); + } + } + + + private void importLdifEntry( LdifEntry ldifEntry, Context context ) throws IOException + { + try + { + // convert LDIF entry to server entry + ServerEntry entry = new DefaultServerEntry( store.getSchemaManager(), ldifEntry.getEntry() ); + + // add operational attributes + entry.put( SchemaConstants.ENTRY_UUID_AT, UUID.randomUUID().toString() ); + entry.put( SchemaConstants.ENTRY_CSN_AT, CSN_FACTORY.newInstance().toString() ); + entry.put( SchemaConstants.CREATORS_NAME_AT, ServerDNConstants.ADMIN_SYSTEM_DN_NORMALIZED ); + entry.put( SchemaConstants.CREATE_TIMESTAMP_AT, DateUtils.getGeneralizedTime() ); + + // write to HBase + store.getMasterTable().add( entry ); + + context.getCounter( Counters.IMPORTED_ENTRIES ).increment( 1 ); + } + catch ( Throwable e ) + { + System.err.println( "Error importing Entry: " ); + System.err.println( ">>>" + ldifEntry + "<<<" ); + e.printStackTrace(); + throw new IOException( e ); + } + } + +} Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java?rev=912434&view=auto ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java (added) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java Sun Feb 21 22:59:19 2010 @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.directory.server.core.partition.hbase.mapreduce; + + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + + +/** + * An {@link InputFormat} for LDIF files. Files are broken into LDIF records. + * Either linefeed or carriage-return are used to signal end of line. Keys are + * the position in the file, and values are the LDIF records. + * + * @author Apache Directory Project + * @version $Rev$, $Date$ + */ +public class LdifInputFormat extends TextInputFormat +{ + + @Override + public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context ) + { + return new LdifRecordReader(); + } + +} Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java?rev=912434&view=auto ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java (added) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java Sun Feb 21 22:59:19 2010 @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.directory.server.core.partition.hbase.mapreduce; + + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.util.LineReader; + + +/** + * Treats keys as offset in file and value as LDIF record. + * + * @author Apache Directory Project + * @version $Rev$, $Date$ + */ +public class LdifRecordReader extends RecordReader +{ + private static final Log LOG = LogFactory.getLog( LdifRecordReader.class ); + + private CompressionCodecFactory compressionCodecs = null; + private long start; + private long pos; + private long end; + private LineReader in; + + private LongWritable key = null; + private Text value = null; + + private String dn = null; + + + public void initialize( InputSplit genericSplit, TaskAttemptContext context ) throws IOException + { + FileSplit split = ( FileSplit ) genericSplit; + Configuration job = context.getConfiguration(); + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + compressionCodecs = new CompressionCodecFactory( job ); + final CompressionCodec codec = compressionCodecs.getCodec( file ); + + // open the file and seek to the start of the split + FileSystem fs = file.getFileSystem( job ); + FSDataInputStream fileIn = fs.open( split.getPath() ); + boolean skipFirstLine = false; + if ( codec != null ) + { + in = new LineReader( codec.createInputStream( fileIn ), job ); + end = Long.MAX_VALUE; + } + else + { + if ( start != 0 ) + { + skipFirstLine = true; + --start; + fileIn.seek( start ); + } + in = new LineReader( fileIn, job ); + } + if ( skipFirstLine ) + { // skip first line and re-establish "start". + start += in.readLine( new Text(), 0, ( int ) Math.min( ( long ) Integer.MAX_VALUE, end - start ) ); + } + this.pos = start; + + LOG.debug( "LdifRecordReader: start=" + start + ", end=" + end ); + } + + + public boolean nextKeyValue() throws IOException + { + if ( key == null ) + { + key = new LongWritable(); + } + key.set( pos ); + + if ( value == null ) + { + value = new Text(); + } + value.clear(); + + boolean withinRecord = false; + StringBuilder sb = new StringBuilder(); + while ( pos < end || withinRecord ) + { + Text temp = new Text(); + int size = in.readLine( temp ); + if ( size == 0 ) + { + // end of file + break; + } + + String line = temp.toString(); + pos += size; + + // record must start with "dn:" + if ( !withinRecord ) + { + if ( line.startsWith( "dn:" ) ) + { + key.set( pos - size ); + withinRecord = true; + + if ( dn == null ) + { + LOG.debug( "First record at pos " + key + ": " + line ); + } + dn = line; + } + else + { + continue; + } + } + + sb.append( line ); + sb.append( '\n' ); + + // record ends with an empty line + if ( line.trim().isEmpty() && !line.startsWith( " " ) ) + { + withinRecord = false; + break; + } + } + if ( sb.length() == 0 ) + { + LOG.debug( "Last record at pos " + key + ": " + dn ); + key = null; + value = null; + return false; + } + else + { + value.set( sb.toString() ); + return true; + } + } + + + @Override + public LongWritable getCurrentKey() + { + return key; + } + + + @Override + public Text getCurrentValue() + { + return value; + } + + + /** + * Get the progress within the split + */ + public float getProgress() + { + if ( start == end ) + { + return 0.0f; + } + else + { + return Math.min( 1.0f, ( pos - start ) / ( float ) ( end - start ) ); + } + } + + + public synchronized void close() throws IOException + { + LOG.debug( "Close record at pos " + key + ": " + dn ); + if ( in != null ) + { + in.close(); + } + } +} Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java Sun Feb 21 22:59:19 2010 @@ -25,6 +25,7 @@ import java.util.NavigableMap; import org.apache.directory.server.core.partition.hbase.Cache; +import org.apache.directory.shared.ldap.schema.AttributeType; import org.apache.directory.shared.ldap.schema.SchemaManager; import org.apache.directory.shared.ldap.util.Base64; import org.apache.directory.shared.ldap.util.ByteBuffer; @@ -64,13 +65,49 @@ public int count( Object value ) throws Exception { - byte[] countKey = getCountKey( value ); + byte[] countKey = getEqualsKey( value ); Info info = fetchInfo( countKey ); if ( info == null ) { return 0; } - return info.count.intValue(); + return info.candidates.size(); + } + + + /** + * Gets the equals key. + * The key has the following syntax: + *
  =value
+ * where value is the normalized value. + * + * @param value the value + * @return the count row key for the value + * @throws Exception + */ + private byte[] getEqualsKey( Object value ) throws Exception + { + if ( value == null ) + { + return null; + } + + ByteBuffer bb = new ByteBuffer(); + + bb.append( '=' ); + + byte[] normValue = getNormalized( value ); + bb.append( normValue ); + + return bb.copyOfUsedBytes(); + } + + + public Object extractValueFromEqualsKey( byte[] row ) throws Exception + { + byte[] value = Bytes.tail( row, row.length - 1 ); + AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid ); + return at.getSyntax().isHumanReadable() ? Bytes.toString( value ) : value; } @@ -78,7 +115,7 @@ { ByteBuffer bb = new ByteBuffer(); - bb.append( '#' ); + bb.append( '=' ); // add value // there are special values to support attribute scan @@ -112,8 +149,8 @@ public List getColumnCandidates( Object value ) throws Exception { - byte[] countKey = getCountKey( value ); - Info info = fetchInfo( countKey ); + byte[] equalsKey = getEqualsKey( value ); + Info info = fetchInfo( equalsKey ); if ( info == null ) { return null; @@ -140,8 +177,8 @@ */ public boolean exists( Object value, Long id ) throws Exception { - byte[] countKey = getCountKey( value ); - Info info = fetchInfo( countKey ); + byte[] equalsKey = getEqualsKey( value ); + Info info = fetchInfo( equalsKey ); if ( info == null ) { return false; @@ -180,7 +217,7 @@ String key = String.valueOf( Base64.encode( row ) ); Info info = new Info(); - info.value = getValueFromCountKey( row ); + info.value = extractValueFromEqualsKey( row ); infoCache.put( key, info ); if ( result.isEmpty() ) @@ -196,10 +233,6 @@ { info.candidates.add( Bytes.toLong( qualifier ) ); } - else if ( Bytes.equals( COUNT_QUALIFIER, qualifier ) ) - { - info.count = Bytes.toLong( result.getFamilyMap( INFO_FAMILY ).get( qualifier ) ); - } } return info; } @@ -207,23 +240,11 @@ public void add( byte[] value, Long id ) throws Exception { - // exact match (attribute=value): #value -> count, id - // check first if the index already exists because we won't increment the index count - byte[] exactCountRow = getCountKey( value ); - Get exactGet = new Get( exactCountRow ); - exactGet.addColumn( INFO_FAMILY, Bytes.toBytes( id ) ); - if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) ) - { - // get+put+put is not atomic! - Put exactPut = new Put( exactCountRow ); - //exactPut.setWriteToWAL( false ); - exactPut.add( INFO_FAMILY, Bytes.toBytes( id ), Bytes.toBytes( id ) ); - HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut ); - - // increment exact match count: #value -> count - HBaseTableHelper.increment( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY, - COUNT_QUALIFIER ); - } + // exact match (attribute=value): =value -> id + byte[] equalsKey = getEqualsKey( value ); + Put exactPut = new Put( equalsKey ); + exactPut.add( INFO_FAMILY, Bytes.toBytes( id ), Bytes.toBytes( id ) ); + HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut ); // TODO: optimize - don't need to clear the ẃhole cache infoCache.clear(); @@ -234,22 +255,11 @@ public void drop( byte[] value, Long id ) throws Exception { - // exact match (attribute=value): #value -> count, id - // check first if the index exists because we won't decrement the index count otherwise - byte[] exactCountRow = getCountKey( value ); - Get exactGet = new Get( exactCountRow ); - exactGet.addColumn( INFO_FAMILY, Bytes.toBytes( id ) ); - if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) ) - { - Delete exactDel = new Delete( exactCountRow ); - exactDel.deleteColumn( INFO_FAMILY, Bytes.toBytes( id ) ); - HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel ); - - // decrement exact match count: #value -> count - HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY, - COUNT_QUALIFIER ); - // TODO: delete column if count is 0? - } + // exact match (attribute=value): =value -> id + byte[] equalsKey = getEqualsKey( value ); + Delete exactDel = new Delete( equalsKey ); + exactDel.deleteColumn( INFO_FAMILY, Bytes.toBytes( id ) ); + HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel ); // TODO: optimize - don't need to clear the ẃhole cache infoCache.clear(); @@ -260,7 +270,6 @@ class Info { Object value; - Long count = 0L; List candidates = new ArrayList(); } Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java Sun Feb 21 22:59:19 2010 @@ -36,7 +36,6 @@ public static final byte[] INFO_FAMILY = Bytes.toBytes( "info" ); public static final byte[] ID_QUALIFIER = Bytes.toBytes( "id" ); - public static final byte[] COUNT_QUALIFIER = Bytes.toBytes( "count" ); public static final byte[] VALUE_SCAN_START = new byte[] { 0x00 }; public static final byte[] VALUE_SCAN_STOP = new byte[] @@ -66,4 +65,10 @@ public abstract ResultScanner getScanner( Scan scan ) throws Exception; + + public abstract void add( byte[] value, Long id ) throws Exception; + + + public abstract void drop( byte[] value, Long id ) throws Exception; + } \ No newline at end of file Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java Sun Feb 21 22:59:19 2010 @@ -25,9 +25,7 @@ import org.apache.directory.shared.ldap.entry.client.ClientBinaryValue; import org.apache.directory.shared.ldap.schema.AttributeType; import org.apache.directory.shared.ldap.schema.SchemaManager; -import org.apache.directory.shared.ldap.util.ByteBuffer; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -44,10 +42,10 @@ protected String attributeTypeOid; protected String indexTableName; - private HTablePool indexTablePool; + private HBaseTablePool indexTablePool; protected SchemaManager schemaManager; protected HBaseConfiguration configuration; - protected Cache countCache; + protected Cache countCache; protected Cache existsCache; @@ -59,13 +57,17 @@ this.configuration = configuration; String name = schemaManager.getGlobalOidRegistry().getPrimaryName( attributeTypeOid ); this.indexTableName = tablePrefix + "index_" + name; - this.countCache = new Cache( cacheSize ); + this.countCache = new Cache( cacheSize ); this.existsCache = new Cache( cacheSize ); } public void close() throws Exception { + if ( indexTablePool != null ) + { + indexTablePool.close(); + } } @@ -75,12 +77,6 @@ } - public abstract void add( byte[] value, Long id ) throws Exception; - - - public abstract void drop( byte[] value, Long id ) throws Exception; - - protected byte[] getNormalized( Object value ) throws Exception { AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid ); @@ -113,51 +109,15 @@ } - protected HTablePool getIndexTablePool() throws Exception + protected HBaseTablePool getIndexTablePool() throws Exception { if ( indexTablePool == null ) { // ensure table is created HBaseTableHelper.createTable( configuration, indexTableName, INFO_FAMILY ); - indexTablePool = new HTablePool( configuration, 16 ); + indexTablePool = new HBaseTablePool( indexTableName, configuration ); } return indexTablePool; } - - /** - * Gets the count key. - * The key has the following syntax: - *
  #value
- * where value is the normalized value. - * - * @param value the value - * @return the count row key for the value - * @throws Exception - */ - protected byte[] getCountKey( Object value ) throws Exception - { - if ( value == null ) - { - return null; - } - - ByteBuffer bb = new ByteBuffer(); - - bb.append( '#' ); - - byte[] normValue = getNormalized( value ); - bb.append( normValue ); - - return bb.copyOfUsedBytes(); - } - - - public Object getValueFromCountKey( byte[] row ) throws Exception - { - byte[] value = Bytes.tail( row, row.length - 1 ); - AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid ); - return at.getSyntax().isHumanReadable() ? Bytes.toString( value ) : value; - } - } \ No newline at end of file Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java Sun Feb 21 22:59:19 2010 @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -86,14 +85,16 @@ public static final String MASTER_TABLE = "master"; public static final String TREE_TABLE = "tree"; + public static final String MAINTAIN_COUNTERS_PROPERTY = "org.apache.directory.maintain.counters"; private HBaseConfiguration configuration; + private boolean maintainCounters; private SchemaManager schemaManager; private LdapDN suffixDn; - private HTablePool masterTablePool; + private HBaseTablePool masterTablePool; private String masterTableName; - private HTablePool treeTablePool; + private HBaseTablePool treeTablePool; private String treeTableName; private MasterTreeInfo suffixMti; @@ -121,6 +122,7 @@ this.schemaManager = store.getSchemaManager(); this.suffixDn = store.getSuffix(); this.configuration = store.getConfiguration(); + this.maintainCounters = configuration.getBoolean( MAINTAIN_COUNTERS_PROPERTY, true ); this.masterTableName = store.getTablePrefix() + MASTER_TABLE; this.treeTableName = store.getTablePrefix() + TREE_TABLE; this.suffixMti = new MasterTreeInfo( ROOT_ID, suffixDn.getNormName(), null ); @@ -141,27 +143,40 @@ public void close() throws Exception { + if ( masterTablePool != null ) + { + masterTablePool.close(); + } + if ( treeTablePool != null ) + { + treeTablePool.close(); + } } public Long add( ServerEntry entry ) throws Exception { + Long id = addToMaster( entry ); + addToTree( id, entry ); + return id; + } + + + public Long addToMaster( ServerEntry entry ) throws Exception + { Long id = nextId(); List parentIds = fetchParentIds( entry.getDn(), false ); String upRdn; String normRdn; - MasterTreeInfo treeTableKey; if ( entry.getDn().equals( suffixDn ) ) { upRdn = entry.getDn().getName(); normRdn = entry.getDn().getNormName(); - treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn ); } else { upRdn = entry.getDn().getRdn().getUpName(); normRdn = entry.getDn().getRdn().getNormName(); - treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn ); } // put to master and tree table @@ -169,12 +184,9 @@ masterPut.add( TREE_INFO_FAMILY, PARENT_ID_QUALIFIER, Bytes.toBytes( parentIds.get( 0 ) ) ); masterPut.add( TREE_INFO_FAMILY, UP_RDN_QUALIFIER, Bytes.toBytes( upRdn ) ); masterPut.add( TREE_INFO_FAMILY, NORM_RDN_QUALIFIER, Bytes.toBytes( normRdn ) ); - Put treePut = new Put( treeTableKey.treeTableKey ); - treePut.add( TREE_INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) ); for ( EntryAttribute attribute : entry ) { String attr = attribute.getUpId(); - String attrOid = ( ( ServerAttribute ) attribute ).getAttributeType().getOid(); for ( int i = 0; i < attribute.size(); i++ ) { Value value = attribute.get( i ); @@ -184,6 +196,42 @@ // objectClass1 -> top masterPut.add( UP_ATTRIBUTES_FAMILY, Bytes.add( Bytes.toBytes( attr ), Bytes.toBytes( i ) ), value .getBytes() ); + } + } + HBaseTableHelper.put( getMasterTablePool(), masterTableName, masterPut ); + + return id; + } + + + public Long addToTree( Long id, ServerEntry entry ) throws Exception + { + List parentIds = fetchParentIds( entry.getDn(), false ); + String upRdn; + String normRdn; + MasterTreeInfo treeTableKey; + if ( entry.getDn().equals( suffixDn ) ) + { + upRdn = entry.getDn().getName(); + normRdn = entry.getDn().getNormName(); + treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn ); + } + else + { + upRdn = entry.getDn().getRdn().getUpName(); + normRdn = entry.getDn().getRdn().getNormName(); + treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn ); + } + + // put to tree table + Put treePut = new Put( treeTableKey.treeTableKey ); + treePut.add( TREE_INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) ); + for ( EntryAttribute attribute : entry ) + { + String attrOid = ( ( ServerAttribute ) attribute ).getAttributeType().getOid(); + for ( int i = 0; i < attribute.size(); i++ ) + { + Value value = attribute.get( i ); // normAttributes: // 2.5.4.0:inetorgperson -> 0 @@ -205,31 +253,33 @@ } } } - HBaseTableHelper.put( getMasterTablePool(), masterTableName, masterPut ); HBaseTableHelper.put( getTreeTablePool(), treeTableName, treePut ); - // update parent one-level count - MasterTreeInfo parentKey = fetchMasterTreeInfo( parentIds.get( 0 ) ); - if ( parentKey != null ) + if ( maintainCounters ) { - HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY, - ONE_LEVEL_COUNT_QUALIFIER ); - } - - // update all parents sub-level count - for ( Long parentId : parentIds ) - { - parentKey = fetchMasterTreeInfo( parentId ); + // update parent one-level count + MasterTreeInfo parentKey = fetchMasterTreeInfo( parentIds.get( 0 ) ); if ( parentKey != null ) { HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey, - TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER ); + TREE_INFO_FAMILY, ONE_LEVEL_COUNT_QUALIFIER ); } - } - // clear caches - oneLevelCountCache.clear(); - subLevelCountCache.clear(); + // update all parents sub-level count + for ( Long parentId : parentIds ) + { + parentKey = fetchMasterTreeInfo( parentId ); + if ( parentKey != null ) + { + HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey, + TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER ); + } + } + + // clear caches + oneLevelCountCache.clear(); + subLevelCountCache.clear(); + } return id; } @@ -247,22 +297,25 @@ Delete treeDel = new Delete( key.treeTableKey ); HBaseTableHelper.delete( getTreeTablePool(), treeTableName, treeDel ); - // update parent one-level count - Long parentId = key.parentId; - if ( parentId > ROOT_ID ) - { - MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId ); - HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY, - ONE_LEVEL_COUNT_QUALIFIER ); - } - - // update sub-level count of all parents - while ( parentId > ROOT_ID ) - { - MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId ); - HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY, - SUB_LEVEL_COUNT_QUALIFIER ); - parentId = parentKey.parentId; + if ( maintainCounters ) + { + // update parent one-level count + Long parentId = key.parentId; + if ( parentId > ROOT_ID ) + { + MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId ); + HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, + TREE_INFO_FAMILY, ONE_LEVEL_COUNT_QUALIFIER ); + } + + // update sub-level count of all parents + while ( parentId > ROOT_ID ) + { + MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId ); + HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, + TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER ); + parentId = parentKey.parentId; + } } // clear caches @@ -590,32 +643,46 @@ return fetchId( suffixMti ); } + private List ids = new ArrayList(); + private long nextId() throws Exception { - byte[] id = HBaseTableHelper.increment( getMasterTablePool(), masterTableName, SEQUENCE_ROW, TREE_INFO_FAMILY, - SEQUENCE_QUALIFIER ); - return Bytes.toLong( id ); + if ( ids.isEmpty() ) + { + long amount = 100; + byte[] id = HBaseTableHelper.increment( getMasterTablePool(), masterTableName, SEQUENCE_ROW, + TREE_INFO_FAMILY, SEQUENCE_QUALIFIER, amount ); + long upper = Bytes.toLong( id ); + long lower = upper - amount + 1; + for ( long l = lower; l <= upper; l++ ) + { + ids.add( l ); + } + } + + Long id = ids.remove( 0 ); + return id; } - private HTablePool getMasterTablePool() throws Exception + private HBaseTablePool getMasterTablePool() throws Exception { if ( masterTablePool == null ) { HBaseTableHelper.createTable( configuration, masterTableName, TREE_INFO_FAMILY, UP_ATTRIBUTES_FAMILY ); - masterTablePool = new HTablePool( configuration, 16 ); + masterTablePool = new HBaseTablePool( masterTableName, configuration ); } return masterTablePool; } - private HTablePool getTreeTablePool() throws Exception + private HBaseTablePool getTreeTablePool() throws Exception { if ( treeTablePool == null ) { HBaseTableHelper.createTable( configuration, treeTableName, TREE_INFO_FAMILY, NORM_ATTRIBUTES_FAMILY ); - treeTablePool = new HTablePool( configuration, 16 ); + treeTablePool = new HBaseTablePool( treeTableName, configuration ); } return treeTablePool; } @@ -697,4 +764,16 @@ } } + + public String getTreeTableName() + { + return treeTableName; + } + + + public String getMasterTableName() + { + return masterTableName; + } + } Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java Sun Feb 21 22:59:19 2010 @@ -21,13 +21,14 @@ import org.apache.directory.server.core.partition.hbase.Cache; +import org.apache.directory.server.core.partition.hbase.HBaseStore; +import org.apache.directory.server.core.partition.hbase.cursor.HBasePresenceIndexCursor; import org.apache.directory.shared.ldap.schema.SchemaManager; import org.apache.directory.shared.ldap.util.Base64; import org.apache.directory.shared.ldap.util.ByteBuffer; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -48,35 +49,35 @@ public static final byte[] COUNT_ROW = Bytes.toBytes( "!" ); public static final byte[] INFO_FAMILY = Bytes.toBytes( "info" ); public static final byte[] ID_QUALIFIER = Bytes.toBytes( "id" ); - public static final byte[] COUNT_QUALIFIER = Bytes.toBytes( "count" ); public static final byte[] VALUE_SCAN_FIRST_ENTRYID = new byte[] { 0x00 }; public static final byte[] VALUE_SCAN_LAST_ENTRYID = new byte[] { ( byte ) 0xFF }; protected String indexTableName; - private HTablePool indexTablePool; - private SchemaManager schemaManager; - private HBaseConfiguration configuration; + private HBaseTablePool indexTablePool; + private HBaseStore store; private String attributeTypeOid; - private Cache countCache; + private Cache countCache; - public HBasePresenceIndexTable( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix, - HBaseConfiguration configuration, int cacheSize ) throws Exception + public HBasePresenceIndexTable( String attributeTypeOid, HBaseStore store, int cacheSize ) throws Exception { this.attributeTypeOid = attributeTypeOid; - this.schemaManager = schemaManager; - this.configuration = configuration; - String name = schemaManager.getGlobalOidRegistry().getPrimaryName( attributeTypeOid ); - this.indexTableName = tablePrefix + "index_" + name; - this.countCache = new Cache( cacheSize ); + this.store = store; + String name = store.getSchemaManager().getGlobalOidRegistry().getPrimaryName( attributeTypeOid ); + this.indexTableName = store.getTablePrefix() + "index_" + name; + this.countCache = new Cache( cacheSize ); } public void close() throws Exception { + if ( indexTablePool != null ) + { + indexTablePool.close(); + } } @@ -121,10 +122,23 @@ return countCache.get( attributeTypeOid ).intValue(); } - Long count = HBaseTableHelper.getLongValue( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY, - COUNT_QUALIFIER, 0L ); + // TODO: scan directly instead of using the cursor? + HBasePresenceIndexCursor cursor = new HBasePresenceIndexCursor( this ); + int count = 0; + int limit = 100; + while ( cursor.next() && count <= limit ) + { + count++; + } + if ( count >= limit ) + { + // this is just a guess to avoid subtree scan + count = store.count() / 10; + } + cursor.close(); + countCache.put( attributeTypeOid, count ); - return count.intValue(); + return count; } @@ -153,20 +167,10 @@ public void add( Long entryId ) throws Exception { // presence (attribute=*): * -> id - // check first if the index already exists because we won't increment the index count byte[] presenceRow = getPresenceKey( Bytes.toBytes( entryId ) ); - Get presenceGet = new Get( presenceRow ); - if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, presenceGet ) ) - { - // get+put+put is not atomic! - Put presencePut = new Put( presenceRow ); - presencePut.setWriteToWAL( false ); - presencePut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( entryId ) ); - HBaseTableHelper.put( getIndexTablePool(), indexTableName, presencePut ); - - // increment existence count: attribute: -> count - HBaseTableHelper.increment( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY, COUNT_QUALIFIER ); - } + Put presencePut = new Put( presenceRow ); + presencePut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( entryId ) ); + HBaseTableHelper.put( getIndexTablePool(), indexTableName, presencePut ); countCache.clear(); } @@ -175,29 +179,21 @@ public void drop( Long entryId ) throws Exception { // presence (attribute=*): * -> id - // check first if the index exists because we won't decrement the index count otherwise byte[] presenceRow = getPresenceKey( Bytes.toBytes( entryId ) ); - Get presenceGet = new Get( presenceRow ); - if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, presenceGet ) ) - { - Delete presenceDel = new Delete( presenceRow ); - HBaseTableHelper.delete( getIndexTablePool(), indexTableName, presenceDel ); - - // decrement existence count: attribute: -> count - HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY, COUNT_QUALIFIER ); - } + Delete presenceDel = new Delete( presenceRow ); + HBaseTableHelper.delete( getIndexTablePool(), indexTableName, presenceDel ); countCache.clear(); } - protected HTablePool getIndexTablePool() throws Exception + protected HBaseTablePool getIndexTablePool() throws Exception { if ( indexTablePool == null ) { // ensure table is created - HBaseTableHelper.createTable( configuration, indexTableName, INFO_FAMILY ); - indexTablePool = new HTablePool( configuration, 16 ); + HBaseTableHelper.createTable( store.getConfiguration(), indexTableName, INFO_FAMILY ); + indexTablePool = new HBaseTablePool( indexTableName, store.getConfiguration() ); } return indexTablePool; } Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff ============================================================================== --- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java (original) +++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java Sun Feb 21 22:59:19 2010 @@ -20,11 +20,13 @@ package org.apache.directory.server.core.partition.hbase.table; +import org.apache.directory.server.core.entry.ServerEntry; +import org.apache.directory.server.core.partition.hbase.HBaseStore; +import org.apache.directory.server.core.partition.hbase.index.HBaseUserIndex; +import org.apache.directory.server.xdbm.IndexCursor; import org.apache.directory.shared.ldap.schema.AttributeType; -import org.apache.directory.shared.ldap.schema.SchemaManager; import org.apache.directory.shared.ldap.util.Base64; import org.apache.directory.shared.ldap.util.ByteBuffer; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -45,12 +47,13 @@ public class HBaseRowIndexTable extends HBaseIndexTableBase { private static final Logger LOG = LoggerFactory.getLogger( HBaseRowIndexTable.class ); + private HBaseStore store; - public HBaseRowIndexTable( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix, - HBaseConfiguration configuration, int cacheSize ) throws Exception + public HBaseRowIndexTable( String attributeTypeOid, HBaseStore store, int cacheSize ) throws Exception { - super( attributeTypeOid, schemaManager, tablePrefix, configuration, cacheSize ); + super( attributeTypeOid, store.getSchemaManager(), store.getTablePrefix(), store.getConfiguration(), cacheSize ); + this.store = store; } @@ -146,15 +149,24 @@ return countCache.get( value ).intValue(); } - byte[] row = getCountKey( value ); - if ( row == null ) + // TODO: scan directly instead of using the cursor? + HBaseUserIndex index = store.getUserIndex( attributeTypeOid ); + IndexCursor cursor = index.forwardCursor( value ); + int count = 0; + int limit = 100; + while ( cursor.next() && count <= limit ) + { + count++; + } + if ( count >= limit ) { - return 0; + // this is just a guess to avoid subtree scan + count = store.count() / 10; } - Long count = HBaseTableHelper.getLongValue( getIndexTablePool(), indexTableName, row, INFO_FAMILY, - COUNT_QUALIFIER, 0L ); + cursor.close(); + countCache.put( value, count ); - return count.intValue(); + return count; } @@ -181,22 +193,10 @@ public void add( byte[] value, Long id ) throws Exception { // exact match (attribute=value): =value -> id, value - // check first if the index already exists because we won't increment the index count byte[] exactRow = getEqualsKey( value, id ); - Get exactGet = new Get( exactRow ); - if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) ) - { - // get+put+put is not atomic! - Put exactPut = new Put( exactRow ); - //exactPut.setWriteToWAL( false ); - exactPut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) ); - HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut ); - - // increment exact match count: #value -> count - byte[] exactCountRow = getCountKey( value ); - HBaseTableHelper.increment( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY, - COUNT_QUALIFIER ); - } + Put exactPut = new Put( exactRow ); + exactPut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) ); + HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut ); // TODO: optimize - don't need to clear the ẃhole cache countCache.clear(); @@ -207,20 +207,9 @@ public void drop( byte[] value, Long id ) throws Exception { // exact match (attribute=value): =value -> id - // check first if the index exists because we won't decrement the index count otherwise byte[] exactRow = getEqualsKey( value, id ); - Get exactGet = new Get( exactRow ); - if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) ) - { - Delete exactDel = new Delete( exactRow ); - HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel ); - - // decrement exact match count: #value -> count - byte[] exactCountRow = getCountKey( value ); - HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY, - COUNT_QUALIFIER ); - // TODO: delete column if count is 0? - } + Delete exactDel = new Delete( exactRow ); + HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel ); // TODO: optimize - don't need to clear the ẃhole cache countCache.clear();